Bigdata ucl Wiki
Advertisement

Slides

The slides of the presentation (October 13, 2014) about MapReduce are uploaded here. File:MapReduce.pdf The slides of the project presentation (December 17, 2014) about Hadoop and PyMR are here. File:SlidesProject.pdf

Python implement of MapReduce (PyMR)

The first goal of this project was to implement an easy-to-use and user-friendly python version of Map-Reduce in order to easily design and prototype Map-Reduce algorithms.

How to use PyMR?

This section will give you general instructions in order to use the PyMR library. The complete documentation of PyMR is available here.

Step 0: Software requirements

If you want to use the PyMR library, you must have:

  • Python 2.7.x (not Python 3.0!)
  • PyMR library, available here.

Don't forget to add the line

from MapReduce import MapReduce

at the beginning of your script.

Step 1: Parsing data

The data given to the MapReduce algorithm must be files with one input/line. For instance, If you have a file with text, say "Hello World!", and the mapper needs to have only one word as input, then you have to transform your file into:

hello
world                      

You can also use the class fileHelper, available in PyMR library. This class can help you to parse easily your input files. The full documentation is available here. Don't forget the fact that you can also contribute to improve the PyMR library. So if you program a new function for parsing files and you think it can be helpful for some people in the future, you are encouraged to submit your method :-).

Step 2: Create the Mapper and the Reducer

Since the MapReduce algorithm requires a user-defined mapper and grouper, you need to implement yourself a mapper class and a grouper class.

The Mapper

The mapper needs to have a method called "map", which takes two arguments : __self__ self and MapContext theContext. You can read the documentation of MapContext class here. Here is an empty mapper class:

 
class EmptyMapper:
    def map(self,theContext):
        iterator = theContext.iterator;        
        input = iterator.getNext();   
        while input is not None:        
            (key,value) = something(input)                       
            theContext.putKeyValue(key,value)
            input = iterator.getNext();
The Reducer

The reducer needs to have a method called "reduce", which takes two arguments : __self__ self and ReduceContext theContext. You can read the documentation of ReduceContext class here. Here is an empty reducer class :

class EmptyReducer:
    def reduce(self,context):
        key = context.key
        iterator = context.iterator
        value = iterator.getNext()
        while value is not None:
            # do something with value.
            value = iterator.getNext()        
        return # something

Step 3: Create the MapReducer and launch the algorithm

Once the mapper and reducer are created, you juste need to instanciate them, create a instance of MapReduce class and call the execute() routine. You can see a generic example below.

from MapReduce import MapReduce
from YourMapper import YourMapper
from YourReducer import YourReducer
theMapper = YourMapper(MapInputs)
theReducer = YourReducer(ReduceInputs)
theMapReducer = MapReduce(theMapper,theReducer,listOfInputParsedFiles)
resultDictionary = theMapReducer.execute()

The execute() routine will give you as output a dictionnary with all pairs key/values generated by your reducer. For more information, you can read the documentation over MapReduce class here.

Warning : since you can create your own mapper and reducer, you might be willing to do some strange things inside the methods reduce and map, like creating and accessing instance variable. Note that if you intend to run you program using the parallel version of PyMR, the algorithm will call the map and reduce functions multiple times simultaneously, so don't write anything outside the map and reduce function. You can safely access shard variable but don't write anything !

Counting words with PyMR

Your can find below a complete example of counting words with PyMR. You can also run the file demo_CountingWords.py.

Input File

Create a file named dataFile in C:\mapReduceCountingWords with some content, for example :

Hello! It is a Hello world!

Mapper

class MapperCountingWords:
    def map(self,theContext):
        mapIterator = theContext.iterator;
        word = mapIterator.getNext().rstrip('\n');
        while word:
            theContext.putKeyValue(word, 1)
            word = mapIterator.getNext().rstrip('\n');

     

Reducer

class ReducerCountingWords:
    def reduce(self,context):
        key = context.key;
        iterator = context.iterator;
        totValue = 0;
        actualValue = iterator.getNext()
        while actualValue:
            totValue = totValue + int(actualValue)
            actualValue = iterator.getNext()
        return totValue

ExecutionFile

from MapperCountingWords import MapperCountingWords
from ReducerCountingWords import ReducerCountingWords
from MapReduce import MapReduce
from FileHelper import FileHelper

# Create instances for mapper and reducer
theMapper = MapperCountingWords();
theReducer = ReducerCountingWords();

# parse the file : one word/line
inFiles = ['C:/mapReduceCountingWords/dataFile'];

# we can have more than one text file
inFileParsed = 'files/dataFileParsed';
FileHelper.transformTextIntoListOfWords(inFiles,inFileParsed)

# MapReduce
theMapReducer = MapReduce(theMapper,theReducer,[inFileParsed],silent=0,nThreads=5)
resultDict = theMapReducer.execute()

# Write output
outFileFirectory = 'C:/mapReduceCountingWords/'
outfileName = 'coutingWordsResults.txt';
FileHelper.writeDictionnary(outFileFirectory+outfileName,resultDict)

Output

The output file, named coutingWordsResults.txt will be in the file C:\mapReduceCountingWords\. The content of this file must be :

a : 1
world : 1
is : 1
hello : 2
it : 1

Matrix-vector multiplication with PyMR

Your can find below a complete example of a matrix-vector multiplication with PyMR. The goal of this example is to compute a matrix-vector product with MapReduce.

The general Map-Reduce algorithm for computing Matrix-Vector multiplication is the following We want to compute The Map-Reduce algorithm is then :

  • Map : the input is an entry of the matrix A stored as . The output is the following key-value pair :
  • Reduce : the input is a key-values pair and the output is

The specific operation made in this example is (using Matlab notation):

[1 2 ; 1 0] * [1 ; 1]

The result is indeed :

[3 ; 1]

Inputs files

Here is the content of file "A_matrix". Create this file with the name "A_matrix" in C:\mapReduceMatrixMultiplication with the following content. Note that zero values are ommitted (this is a sparse notation).

1 1 1.0
1 2 2.0
2 1 1.0

In addition, create the file "b_vector" in C:\mapReduceMatrixMultiplication with the following content.

1
1

Mapper

The following mapper load the b vector from file into memory at initialization.

class MapperMatrixVector:

    def __init__(self,vectorFile):
        num_lines = sum(1 for line in open(vectorFile))
        self.vector = [0 for i in range(num_lines)];
        f=open(vectorFile, "r")
        counter = 0;
        for line in f:
            self.vector[counter] = float(line.rstrip('\n'))
            counter = counter+1;

    def map(self,theContext):
        mapIterator = theContext.iterator;        
        entry = mapIterator.getNext();
        while entry:
            entry = entry.split();
            line = entry[0].rstrip('\n');
            col = entry[1].rstrip('\n');
            val = float(entry[2])
            theContext.putKeyValue(line,val*self.vector[int(col)-1] )
            entry = mapIterator.getNext();

Reducer

class ReducerMatrixVector:

    def reduce(self,context):
        iterator = context.iterator;
        totValue = 0;
        actualValue = iterator.getNext()
        while actualValue is not None:
            totValue = totValue + float(actualValue)
            actualValue = iterator.getNext()        
        return totValue

ExecutionFile

from MapperMatrixVector import MapperMatrixVector
from ReducerMatrixVector import ReducerMatrixVector
from MapReduce import MapReduce
from FileHelper import FileHelper

# Create instances for mapper and reducer
# Note that the vector is stored in the instance
theReducerMatrixVector = ReducerMatrixVector();
theMapperMatrixVector = MapperMatrixVector('C:/mapReduceMatrixMultiplication/b_vector');

# the file where the matrix is stored
matrixFile = ['C:/mapReduceMatrixMultiplication/A_matrix'];

# MapReduce
theMapReducerMatrixVector = MapReduce(theMapperMatrixVector,theReducerMatrixVector,matrixFile,0,1)
resultDictMatrixVector = theMapReducerMatrixVector.execute();

# Write output
resultListMatrixVector = [];
for i in range(len(resultDictMatrixVector),0,-1):
    if resultDictMatrixVector.has_key(str(i)):
        resultListMatrixVector.append(resultDictMatrixVector[str(i)])
    else:
        resultListMatrixVector.append(0)
FileHelper.writeListInFile('C:/mapReduceMatrixMultiplication/MatrixVectorResults.txt',resultListMatrixVector)

Output

The output is the file "MatrixVectorResults.txt". As expected, the file contains the following lines :

3.0
1.0

Performances

We can now start to wonder about the performance of our MapReduce implementation. The matrix-vector multiplication is a simple algorithm, well suited for performances analysis. Here, we created tridiagonal sparse matrices with increasing sizes. Time execution is presented in the following table

Performance of the Matrix-Vector multiplication
Size of the matrix Execution time [s]
100 0.05
1 000 0.4
10 000 5.18
100 000 320

As expected, the algorithm is linear at the beginning, but then performances deteriorate, probably due to a overflow of the hard drive cache.

Simulation of picture similarities

In this section we will se a general model to compute similarities between two pictures for the whole set of picture. Since the goal of this project is not signal processing, we will just use a very simple model of a function which compute similarity between two pictures.

  • Mapper: The goal of the mapper is to generate all possible pairs of pictures (without the symmetry). To do this, we create keys = (where is the number of pictures). For each keys we give all possible value in subject to . In our case, the input of the mapper is simply the first picture. The key is this picture, and the value is a list
  • Reducer: The goal of the reducer is to analyse all pairs key-values with a function which gives a similarity measure between two pictures . This reducer will keep the key, which is the picture , and iterate over all element in the list, which will be the picture , and compute the similarity bewtween and . This value is stored in a list which will be returned at the end of the execution.

Input file

The input file will contains only references to the picture or the picture itself. To make the code simplier, we will juste write a number (we can suppose that with this number we can get the right picture). For example, we can generate this content :

0
1
... (continue)
18
19

Mapper

The goal of the mapper is to create all pairs of possible pictures. We assume that the similarity between two pictures is symmetric, so we will only generate pairs where . Also, to make the mapper simpler, we suppose that we know in advance the size of the set of pictures that needs to be analysed.

class MapperSimilarity:

    def __init__ (self,maxPictures):
        self.maxPictures = maxPictures;
    
    def map(self,theContext):
        mapIterator = theContext.iterator;
        pictureIdx = mapIterator.getNext().rstrip('\n');
        while pictureIdx:
            pictureIdx = int(pictureIdx);
            # Generate all pairs (i,j) where j >= i.
            for i in range(pictureIdx,self.maxPictures):
                theContext.putKeyValue(pictureIdx, i)
            pictureIdx = mapIterator.getNext().rstrip('\n');

Reducer

The reducer will use the function which computes similarity.

class ReducerSimilarity:
    def reduce(self,context):
        pict1 = int(context.key);
        iterator = context.iterator;
        similarVec = [];
        pict2 = iterator.getNext()
        while pict2:
            pict2 = int(pict2)
            if(ReducerSimilarity.computeSimilarity(pict1,pict2) == 1):
                similarVec.append(pict2)
            pict2 = iterator.getNext()
        return similarVec

    def computeSimilarity(pict1Idx,pict2Idx):
        # user-defined function. 
        # Return 1 if pictures are similar, 0 else.
        return pict1Idx == pict2Idx; # Naive : pictures are similar if there are the same.

Execution file

The execution file is really simple since the inputs are already formated.

from MapperSimilarity import MapperSimilarity
from ReducerSimilarity import ReducerSimilarity
from MapReduce import MapReduce
from FileHelper import FileHelper

nThreads = 1;

# Create instances for mapper and reducer
maxPictures = 20;
theMapper = MapperSimilarity(maxPictures);
theReducer = ReducerSimilarity();

# MapReduce
theMapReducer = MapReduce(theMapper,theReducer,['Your input file path/file name'],0,nThreads)
resultDict = theMapReducer.execute()

# Write output
outFileFirectory = 'Your output file directory'
outfileName = 'Your output file name';
FileHelper.writeDictionnary(outFileFirectory+outfileName,resultDict)

Output

The output is supposed to be :

11 : [11]
10 : [10]
13 : [13]
... (etc)
9 : [9]
8 : [8]

Note that the order doesn't matter. You just need to have all numbers between 0 and 19 included.

Multi-thread performances

We can now assess the advantage of having more than a single thread when the computation of the map or the reduce operation is expensive. To do this test, we have defined a new function computeSimilarity.

import time
class ReducerSimilarity:
    def reduce(self,context):
        pict1Idx = int(context.key);
        iterator = context.iterator;
        similarVec = [];
        pict2 = iterator.getNext()
        
        def computeSimilarity(pict1Idx,pict2Idx):
            time.sleep(0.25); # waits 1/10 seconds
            return (abs(pict1Idx-pict2Idx)<=1)
    
        while pict2:
            pict2 = int(pict2)
            if(computeSimilarity(pict1Idx,pict2) == 1):
                similarVec.append(pict2)
            pict2 = iterator.getNext()
        return similarVec

In this function, we simulate the fact that computing the similarity for a pair of pictures can take a non-negligeable amount of time. Also, to avoid really long total computation time, each chunk contains only one integer. Then, the following table shows the evolution of computation time in function of the number of threads.

Multi-threading applied to picture similarity
Number of threads Execution time [s]
1 52.69
2 27.14
3 19.18
5 10.93
10 5.67
15 5.17
20 5.21

We can clearly see the almost linear speedup at the beginning, followed by a stagnation. This is due to the fact that first we have a limited amount of cores available, and also that this simple algorithm have a pretty bad load balancing : the list of values for the first image is [1, 2, 3, ..., 20] while for the last one it is only [20] !

Hadoop and MapReduce

Hadoop (official webpage) is one of the most famous implementation of DFS (Distributed File System) and MapReduce. In this section, we provide a simple tutorial on how to install Hadoop and how to run some basic algorithms, like counting words or matrix-vector multiplication.

All the code used in this section can be downloaded here.

Prerequisites

To install and run simple Hadoop jobs, you will need

  • A Linux or Mac OSX system (Windows doesn't seems to be fully supported yet)
  • Java

Installing Hadoop

Hadoop can run:

  1. In Standalone mode: this is the simplest mode. It only uses one core of your computer, and doesn't takes advantage of the DFS. It is useful to debug and prototype algorithms.
  2. In Pseudo-distributed mode: this is the intermediate level. It uses the DFS and multiple cores, but still runs on one single computer.
  3. In Full-distributed mode: this is the more complex setup : it uses multiple computers (a cluster).

We will show you how to install and use the first mode.

You can find an official tutorial for the first two modes here.

Download and install

First download hadoop from one of the official mirrors.

Uncompress the archive and copy it in any folder of your computer. In this tutorial, we'll assume you copy it into

/Users/yourname/Hadoop/hadoop-2.5.2

(replace 2.5.2 by the version you downloaded)

For example, the path to the README.txt file is

/Users/yourname/Hadoop/hadoop-2.5.2/README.txt

Update environment variables

Now, we need to modify some environment variables. Open the file

/Users/yourname/Hadoop/hadoop-2.5.2/etc/hadoop/hadoop-env.sh

and add or modify the following line

# set to the root of your Java installation
export JAVA_HOME=/library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home/
# Assuming your installation directory is /Users/yourname/Hadoop/hadoop-2.5.2
export HADOOP_PREFIX=/Users/yourname/Hadoop/hadoop-2.5.2

The JAVA_HOME line should be adapted to your system. It should points to the JAVA home directory (containing the LICENSE and COPYRIGHT files)

Hadoop is now installed. Open a terminal and navigate to

/Users/yourname/Hadoop/hadoop-2.5.2/

then type

bin/hadoop

It should display Hadoop's help.

Running MapReduce : the WordCount example

Hadoop is now installed, and you should be able to run basic MapReduce jobs using only one JAVA process.

Let's see how to run a basic WordCount example (the code of this example comes from both Hadoop official tutorials and here).

First, create the JAVA class. Simply download the following JAVA class and save it as

/Users/yourname/Hadoop/hadoop-2.5.2/WordCount.java
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCount {

    public static class Map extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, IntWritable> {

        @Override
        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {

            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);

            while (tokenizer.hasMoreTokens()) {
                value.set(tokenizer.nextToken());
                output.collect(value, new IntWritable(1));
            }

        }
    }

    public static class Reduce extends MapReduceBase implements
            Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }

            output.collect(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {

        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("wordcount");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(Map.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);

    }
}

In this folder, create a folder named

/Users/yourname/Hadoop/hadoop-2.5.2/input_wordcount

and fill it with text files you want to analyse. For example the Twitter data. You can have multiples files.

Then, open a terminal, navigate to the Hadoop folder, and enter the following commands.

First, update environment variables in terminal :

export JAVA_HOME=/library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home/
export PATH=$JAVA_HOME/bin:$PATH
export HADOOP_CLASSPATH=$JAVA_HOME/lib/tools.jar

Then, compile the JAVA class,

bin/hadoop com.sun.tools.javac.Main WordCount.java

create the JAR file,

jar cf WordCount.jar WordCount*.class

and finally run Hadoop :

bin/hadoop jar WordCount.jar WordCount input_wordcount output_wordcount

At the end, the

output_wordcount

folder will contains two file. The first one, named

_SUCCESS

simply means everything went fine. The second one, named

part-00000

contains the output of MapReduce, namely <key, value> pairs. In this case, the keys are the word and value are the number of occurences.

WordCount in more details

Let's now analyse the WordCount example in more details.

The WordCount.java file consists of 3 classes : the mapper, the reducer and the main class. The main class is mainly here to define the classes of the <Key, Value> pairs, of the inputs and of the outputs. The other two classes are more interesting.

Mapper

The inputs of the map function consist of 4 arguments. The first one is the key, followed by the value, then by the output and by some reporter. Exception made of the last one, this is exactly the regular Map-Reduce standard : the mapper takes one <Key, Value> as input, and output zero, one or more <Key, Value> in the output.

Basically, in this case, the value is a line from a text file, while the key is irrelevant. First, we break the line into words using the StringTokenizer classes. Then, while the line contains words, we set value to the current word, and update the output by

output.collect(value, new IntWritable(1));

This is the classical <Word, 1> key-value pair of the WordCount example.

Reducer

The code of the reducer is even simpler.

int sum = 0;
while (values.hasNext()) {
    sum += values.next().get();
}
output.collect(key, new IntWritable(sum));

Basically, we iterate over the elements of values (with the hasNext() and next() method), and we simply increment sum by the values. At the end, we return the <Key, Sum> where Key is the same key as in the input (the word), and Sum is the sum of the values in the values list.

A few results with Twitter data

Now that we know how to use Map-Reduce and Hadoop, we're ready to tackle some big problems.

The first one - maybe the easiest - would be to count for each word the number of occurrences in the tweets. To do that, the first step is to extract the tweets content. Then, we can simply run the WordCount algorithm on it. Finally, we can sort the words by the number of occurrences to extract the most frequent words.

Results of WordCount over the Twitter dataset
Word Occurences Word Occurences Word Occurences Word Occurences Word Occurences
de 1420912 je 931274 ???? 778256 a 676432 la 673392
pas 632459 en 592281 le 532160 que 500950 me 493155
! 473470 et 401930 les 383593 at 377620 c'est 375132
I'm 370016 un 366329 ? 334248 Je 317364 à 301413
in 289383 ik 288244 is 282895 pour 270614 des 257046
tu 251836 on 251521 mon 245422 est 243473 ma 241448
une 238427 trop 235215 ça 234347 j'ai 232933 il 230798
qui 228101 ???????? 226551 een 222578 van 212019 te 206896
avec 202559 mais 202257 moi 198920 plus 196125 ce 191429
dans 189510 the 185812 suis 185590 fait 183121 ?? 180375
du 178083 au 174338 elle 173394 J'ai 167742 met 166630
het 165778 I 163449 (@ 160016 op 157904 sur 154954
you 153844 va 149515 to 149449 bien 145425 - 144929
w/ 144396 tout 140681 vais 136479 faire 133084 @ 132406
voor 131484 dat 124911 sa 122852 se 122303 niet 120467
die 118920 Ik 116627 :) 111440 même 110788 of 110710
comme 110279 quand 106859 , 106765 si 104488 ne 103289
toi 99144 y 98639 ?????? 98236 ???????????? 97659 C'est 94099
maar 93226 mes 93127 2 91878 dit 90925 vous 90547
my 88188 nog 87546 and 86785 echt 84366 mdr 83533

Extension of WordCount

To avoid some of the strange results in the previous table (?????? or :) for example) we can add some criterion about the word we select directly inside the mapper. The mapper can also take care of some "conversion" (like putting all words to lower case, etc). The code of the map fonction then become slightly more complicated

@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
        throws IOException {
    
    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line);

    while (tokenizer.hasMoreTokens()) {
        String word = tokenizer.nextToken() ; 
        // Remove accent
        word = deAccent(word) ;
        // Set to lower case and remove punctuation                               
        word = word.replaceAll("[^a-zA-Z0-9'#]", "") ;
        //System.out.println("From " + word1 + " to " + word) ;
        if (word.length() > 0) {
            word = word.toLowerCase() ;
            value.set(word);                
            output.collect(value, new IntWritable(1));   
        }             
    }

}

where deAccent is function removing accents.

In this case, results are more regular :

Results of WordCount1 over the Twitter dataset
Word Occurences Word Occurences Word Occurences Word Occurences Word Occurences
de 1511822 je 1276007 a 1047597 la 823707 pas 685017
en 680956 le 620439 me 538259 que 518545 c'est 477853
et 468833 les 467209 ik 416741 j'ai 406732 un 399836
at 394207 i'm 379167 ca 349692 on 333387 in 316186

Now, let's imagine we only want to count words with more than 10 letters, beginning with an a. How can we do that using Map-Reduce ? Of course, we could use the previous algorithm, and then extract the data we care about. But let's be smarter : we can modify the map function directly. The code will then be

@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
    throws IOException {

    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line);

    while (tokenizer.hasMoreTokens()) {
        String word = tokenizer.nextToken() ;
        if (word.length() >= 10 && word.charAt(0) == 'a') {
            value.set(word);                
            output.collect(value, new IntWritable(1));   
        }             
    }
}

And the results are

Results of WordCount2 over the Twitter dataset
Word Occurences Word Occurences Word Occurences Word Occurences Word Occurences
aujourd'hui 19942 anniversaire 18576 après-midi 4547 afgehandeld 2465 absolument 1941
automatically 1863 aujourd'hui, 1640 apparemment 1398 aflevering 1330 aujourd'hui 1309
actuellement 965 antwoorden 818 ahahahahah 683 abonnement 628 aangesproken 578
achtergrond 554 anniversaire, 531 aujourd'hui.. 497 afgesloten 492 aujourd'hui! 484

Finally, if we only want to extract useful words (defined as words with more than 8 characters and beginning with an actual letter), we need to have this mapper

@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
    throws IOException {

    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line);

    while (tokenizer.hasMoreTokens()) {
        String word = tokenizer.nextToken() ;
        if (word.length() >= 8 && Character.isLetter(word.charAt(0))) {
            value.set(word);                
            output.collect(value, new IntWritable(1));   
        }             
    }

}

In this case, results are the following

Results of WordCount3 over the Twitter dataset
Word Occurrences Word Occurrences Word Occurrences Word Occurrences Word Occurrences
vraiment 47152 toujours 44705 tellement 31380 personne 27850 West-Vlaanderen 21341
Oost-Vlaanderen 20516 aujourd'hui 19942 maintenant 19751 anniversaire 18576 commence 18395
vacances 18353 regarder 17572 pourquoi 17107 Antwerpen 17091 richting 14850
meilleur 12063 Brussels 11929 hahahaha 11855 Pourquoi 11774 Vlaams-Brabant 11609

This seems interesting: there is a very high number of occurences for the names of the flemish provinces in Belgium. This might of course be due to the fact that there are more Dutch-speaking people than French-speaking people. Still, they appear so many times that this result might seems counter-intuitive at first sight.

The Matrix-Vector Multiplication using Hadoop

Another example of computation that can be done with Map-Reduce is the Matrix-Vector multiplication.

In this case, we want to compute

The Map-Reduce algorithm is then the following.

  • Map: the input is an entry of the matrix A stored as . The output is the following key-value pair :
  • Reduce: the input is a key-values pair and the output is

The Java code for this MapReduce is the following

import java.io.IOException;
import java.io.File;
import java.io.BufferedReader;
import java.io.InputStreamReader;

import java.util.Iterator;
import java.util.StringTokenizer;
import java.util.Scanner;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;



public class SparseMatrixVector {

    private static double[] b ; // The vector    

    public static class Map extends MapReduceBase implements
            Mapper<LongWritable, Text, IntWritable, DoubleWritable> {

        @Override
        public void map(LongWritable key, Text value, OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
                throws IOException {

            String line = value.toString(); // (i, j, A_ij)
            String[] IJAij = line.split(" "); // Divide string into (at least) 3 elements : i, j, and A_ij
            int i = Integer.parseInt(IJAij[0]) ;
            int j = Integer.parseInt(IJAij[1]) ;
            double Aij = Double.parseDouble(IJAij[2]) ;
            
            output.collect(new IntWritable(i), new DoubleWritable(Aij * b[j-1])); // Output (i, A_ij b_j) CAUTION : j-1 because of Matlab indexing
        }
    }

    public static class Reduce extends MapReduceBase implements
            Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> {

        @Override
        public void reduce(IntWritable key, Iterator<DoubleWritable> values,
                OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
                throws IOException {

            double sum = 0.0 ;
            while (values.hasNext()) {
                sum += values.next().get();
            }            
            output.collect(key, new DoubleWritable(sum)); // Output (i, SUM_j A_ij b_j)
            
        }
    }

    public static void main(String[] args) throws Exception {

        /* Display arguments */
        System.out.println("====================================") ;
        System.out.println("b vector path : " + args[0]) ;
        System.out.println("b vector size : " + args[1]) ;
        System.out.println("A matrix folder path : " + args[2]) ;
        System.out.println("Ax vector folder path : " + args[3]) ;
        System.out.println("====================================") ;

        /* 1) Create the b vector : every map can access it in memory */        
        int n = Integer.parseInt(args[1]) ;
        b = new double[n] ;
        int i = 0 ;
        Path b_vector_path = new Path(args[0]) ;                   
        FileSystem fs = FileSystem.get(new Configuration());
        BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(b_vector_path)));
        String line;
        
        line = br.readLine();
        while (line != null){                
            if (i >= n) {
                System.out.println("i out of bounds !") ;
            }
            b[i] = Double.parseDouble(line);
            i ++ ;
            line = br.readLine();
        }
    
        /* 2) Run MapReduce to compute x = A*b */
        JobConf conf = new JobConf(SparseMatrixVector.class);
        conf.setJobName("sparseMatrixVector");

        conf.setOutputKeyClass(IntWritable.class);
        conf.setOutputValueClass(DoubleWritable.class);

        conf.setMapperClass(Map.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(args[2]));
        FileOutputFormat.setOutputPath(conf, new Path(args[3]));

        JobClient.runJob(conf);

    }
}

This code can then be compiled and run by typing

bin/hadoop com.sun.tools.javac.Main SparseMatrixVector.java
jar cf SparseMatrixVector.jar SparseMatrixVector*.class
bin/hadoop jar SparseMatrixVector.jar SparseMatrixVector b2_vector 200000 A/ output_sparseMatrixVector/

Where

b2_vector

is the path to the vector,

200000

is the size of the vector,

A/

is the path to the folder containing the matrix (which can be divided into small files) and

output_sparseMatrixVector/

is the folder where the results should be written.

Performances

We can now try to assess the performances of MapReduce on this Matrix-Vector multiplication. In this example, we used sparse tridiagonal matrices.

Performance of Map-Reduce on Matrix-Vector multiplication
Matrix size Execution time [s]
100 1.8
1 000 1.8
10 000 1.9
100 000 3.7
1 000 000 9.6
10 000 000 85

We see the the complexity is asymptotically linear, but at the beginning the algorithm isn't effective at all. This shows that Hadoop is useful for tacking large problems, but shouldn't be used when solving small instances problems.

Advertisement