Slides
The slides of the presentation (October 13, 2014) about MapReduce are uploaded here. File:MapReduce.pdf The slides of the project presentation (December 17, 2014) about Hadoop and PyMR are here. File:SlidesProject.pdf
Python implement of MapReduce (PyMR)
The first goal of this project was to implement an easy-to-use and user-friendly python version of Map-Reduce in order to easily design and prototype Map-Reduce algorithms.
How to use PyMR?
This section will give you general instructions in order to use the PyMR library. The complete documentation of PyMR is available here.
Step 0: Software requirements
If you want to use the PyMR library, you must have:
- Python 2.7.x (not Python 3.0!)
- PyMR library, available here.
Don't forget to add the line
from MapReduce import MapReduce
at the beginning of your script.
Step 1: Parsing data
The data given to the MapReduce algorithm must be files with one input/line. For instance, If you have a file with text, say "Hello World!", and the mapper needs to have only one word as input, then you have to transform your file into:
hello world
You can also use the class fileHelper, available in PyMR library. This class can help you to parse easily your input files. The full documentation is available here. Don't forget the fact that you can also contribute to improve the PyMR library. So if you program a new function for parsing files and you think it can be helpful for some people in the future, you are encouraged to submit your method :-).
Step 2: Create the Mapper and the Reducer
Since the MapReduce algorithm requires a user-defined mapper and grouper, you need to implement yourself a mapper class and a grouper class.
The Mapper
The mapper needs to have a method called "map", which takes two arguments : __self__ self and MapContext theContext. You can read the documentation of MapContext class here. Here is an empty mapper class:
class EmptyMapper:
def map(self,theContext):
iterator = theContext.iterator;
input = iterator.getNext();
while input is not None:
(key,value) = something(input)
theContext.putKeyValue(key,value)
input = iterator.getNext();
The Reducer
The reducer needs to have a method called "reduce", which takes two arguments : __self__ self and ReduceContext theContext. You can read the documentation of ReduceContext class here. Here is an empty reducer class :
class EmptyReducer:
def reduce(self,context):
key = context.key
iterator = context.iterator
value = iterator.getNext()
while value is not None:
# do something with value.
value = iterator.getNext()
return # something
Step 3: Create the MapReducer and launch the algorithm
Once the mapper and reducer are created, you juste need to instanciate them, create a instance of MapReduce class and call the execute() routine. You can see a generic example below.
from MapReduce import MapReduce
from YourMapper import YourMapper
from YourReducer import YourReducer
theMapper = YourMapper(MapInputs)
theReducer = YourReducer(ReduceInputs)
theMapReducer = MapReduce(theMapper,theReducer,listOfInputParsedFiles)
resultDictionary = theMapReducer.execute()
The execute() routine will give you as output a dictionnary with all pairs key/values generated by your reducer. For more information, you can read the documentation over MapReduce class here.
Warning : since you can create your own mapper and reducer, you might be willing to do some strange things inside the methods reduce and map, like creating and accessing instance variable. Note that if you intend to run you program using the parallel version of PyMR, the algorithm will call the map and reduce functions multiple times simultaneously, so don't write anything outside the map and reduce function. You can safely access shard variable but don't write anything !
Counting words with PyMR
Your can find below a complete example of counting words with PyMR. You can also run the file demo_CountingWords.py.
Input File
Create a file named dataFile in C:\mapReduceCountingWords with some content, for example :
Hello! It is a Hello world!
Mapper
class MapperCountingWords:
def map(self,theContext):
mapIterator = theContext.iterator;
word = mapIterator.getNext().rstrip('\n');
while word:
theContext.putKeyValue(word, 1)
word = mapIterator.getNext().rstrip('\n');
Reducer
class ReducerCountingWords:
def reduce(self,context):
key = context.key;
iterator = context.iterator;
totValue = 0;
actualValue = iterator.getNext()
while actualValue:
totValue = totValue + int(actualValue)
actualValue = iterator.getNext()
return totValue
ExecutionFile
from MapperCountingWords import MapperCountingWords
from ReducerCountingWords import ReducerCountingWords
from MapReduce import MapReduce
from FileHelper import FileHelper
# Create instances for mapper and reducer
theMapper = MapperCountingWords();
theReducer = ReducerCountingWords();
# parse the file : one word/line
inFiles = ['C:/mapReduceCountingWords/dataFile'];
# we can have more than one text file
inFileParsed = 'files/dataFileParsed';
FileHelper.transformTextIntoListOfWords(inFiles,inFileParsed)
# MapReduce
theMapReducer = MapReduce(theMapper,theReducer,[inFileParsed],silent=0,nThreads=5)
resultDict = theMapReducer.execute()
# Write output
outFileFirectory = 'C:/mapReduceCountingWords/'
outfileName = 'coutingWordsResults.txt';
FileHelper.writeDictionnary(outFileFirectory+outfileName,resultDict)
Output
The output file, named coutingWordsResults.txt will be in the file C:\mapReduceCountingWords\. The content of this file must be :
a : 1 world : 1 is : 1 hello : 2 it : 1
Matrix-vector multiplication with PyMR
Your can find below a complete example of a matrix-vector multiplication with PyMR. The goal of this example is to compute a matrix-vector product with MapReduce.
The general Map-Reduce algorithm for computing Matrix-Vector multiplication is the following We want to compute The Map-Reduce algorithm is then :
- Map : the input is an entry of the matrix A stored as . The output is the following key-value pair :
- Reduce : the input is a key-values pair and the output is
The specific operation made in this example is (using Matlab notation):
[1 2 ; 1 0] * [1 ; 1]
The result is indeed :
[3 ; 1]
Inputs files
Here is the content of file "A_matrix". Create this file with the name "A_matrix" in C:\mapReduceMatrixMultiplication with the following content. Note that zero values are ommitted (this is a sparse notation).
1 1 1.0 1 2 2.0 2 1 1.0
In addition, create the file "b_vector" in C:\mapReduceMatrixMultiplication with the following content.
1 1
Mapper
The following mapper load the b vector from file into memory at initialization.
class MapperMatrixVector:
def __init__(self,vectorFile):
num_lines = sum(1 for line in open(vectorFile))
self.vector = [0 for i in range(num_lines)];
f=open(vectorFile, "r")
counter = 0;
for line in f:
self.vector[counter] = float(line.rstrip('\n'))
counter = counter+1;
def map(self,theContext):
mapIterator = theContext.iterator;
entry = mapIterator.getNext();
while entry:
entry = entry.split();
line = entry[0].rstrip('\n');
col = entry[1].rstrip('\n');
val = float(entry[2])
theContext.putKeyValue(line,val*self.vector[int(col)-1] )
entry = mapIterator.getNext();
Reducer
class ReducerMatrixVector:
def reduce(self,context):
iterator = context.iterator;
totValue = 0;
actualValue = iterator.getNext()
while actualValue is not None:
totValue = totValue + float(actualValue)
actualValue = iterator.getNext()
return totValue
ExecutionFile
from MapperMatrixVector import MapperMatrixVector
from ReducerMatrixVector import ReducerMatrixVector
from MapReduce import MapReduce
from FileHelper import FileHelper
# Create instances for mapper and reducer
# Note that the vector is stored in the instance
theReducerMatrixVector = ReducerMatrixVector();
theMapperMatrixVector = MapperMatrixVector('C:/mapReduceMatrixMultiplication/b_vector');
# the file where the matrix is stored
matrixFile = ['C:/mapReduceMatrixMultiplication/A_matrix'];
# MapReduce
theMapReducerMatrixVector = MapReduce(theMapperMatrixVector,theReducerMatrixVector,matrixFile,0,1)
resultDictMatrixVector = theMapReducerMatrixVector.execute();
# Write output
resultListMatrixVector = [];
for i in range(len(resultDictMatrixVector),0,-1):
if resultDictMatrixVector.has_key(str(i)):
resultListMatrixVector.append(resultDictMatrixVector[str(i)])
else:
resultListMatrixVector.append(0)
FileHelper.writeListInFile('C:/mapReduceMatrixMultiplication/MatrixVectorResults.txt',resultListMatrixVector)
Output
The output is the file "MatrixVectorResults.txt". As expected, the file contains the following lines :
3.0
1.0
Performances
We can now start to wonder about the performance of our MapReduce implementation. The matrix-vector multiplication is a simple algorithm, well suited for performances analysis. Here, we created tridiagonal sparse matrices with increasing sizes. Time execution is presented in the following table
Size of the matrix | Execution time [s] |
---|---|
100 | 0.05 |
1 000 | 0.4 |
10 000 | 5.18 |
100 000 | 320 |
As expected, the algorithm is linear at the beginning, but then performances deteriorate, probably due to a overflow of the hard drive cache.
Simulation of picture similarities
In this section we will se a general model to compute similarities between two pictures for the whole set of picture. Since the goal of this project is not signal processing, we will just use a very simple model of a function which compute similarity between two pictures.
- Mapper: The goal of the mapper is to generate all possible pairs of pictures (without the symmetry). To do this, we create keys = (where is the number of pictures). For each keys we give all possible value in subject to . In our case, the input of the mapper is simply the first picture. The key is this picture, and the value is a list
- Reducer: The goal of the reducer is to analyse all pairs key-values with a function which gives a similarity measure between two pictures . This reducer will keep the key, which is the picture , and iterate over all element in the list, which will be the picture , and compute the similarity bewtween and . This value is stored in a list which will be returned at the end of the execution.
Input file
The input file will contains only references to the picture or the picture itself. To make the code simplier, we will juste write a number (we can suppose that with this number we can get the right picture). For example, we can generate this content :
0 1 ... (continue) 18 19
Mapper
The goal of the mapper is to create all pairs of possible pictures. We assume that the similarity between two pictures is symmetric, so we will only generate pairs where . Also, to make the mapper simpler, we suppose that we know in advance the size of the set of pictures that needs to be analysed.
class MapperSimilarity:
def __init__ (self,maxPictures):
self.maxPictures = maxPictures;
def map(self,theContext):
mapIterator = theContext.iterator;
pictureIdx = mapIterator.getNext().rstrip('\n');
while pictureIdx:
pictureIdx = int(pictureIdx);
# Generate all pairs (i,j) where j >= i.
for i in range(pictureIdx,self.maxPictures):
theContext.putKeyValue(pictureIdx, i)
pictureIdx = mapIterator.getNext().rstrip('\n');
Reducer
The reducer will use the function which computes similarity.
class ReducerSimilarity:
def reduce(self,context):
pict1 = int(context.key);
iterator = context.iterator;
similarVec = [];
pict2 = iterator.getNext()
while pict2:
pict2 = int(pict2)
if(ReducerSimilarity.computeSimilarity(pict1,pict2) == 1):
similarVec.append(pict2)
pict2 = iterator.getNext()
return similarVec
def computeSimilarity(pict1Idx,pict2Idx):
# user-defined function.
# Return 1 if pictures are similar, 0 else.
return pict1Idx == pict2Idx; # Naive : pictures are similar if there are the same.
Execution file
The execution file is really simple since the inputs are already formated.
from MapperSimilarity import MapperSimilarity
from ReducerSimilarity import ReducerSimilarity
from MapReduce import MapReduce
from FileHelper import FileHelper
nThreads = 1;
# Create instances for mapper and reducer
maxPictures = 20;
theMapper = MapperSimilarity(maxPictures);
theReducer = ReducerSimilarity();
# MapReduce
theMapReducer = MapReduce(theMapper,theReducer,['Your input file path/file name'],0,nThreads)
resultDict = theMapReducer.execute()
# Write output
outFileFirectory = 'Your output file directory'
outfileName = 'Your output file name';
FileHelper.writeDictionnary(outFileFirectory+outfileName,resultDict)
Output
The output is supposed to be :
11 : [11] 10 : [10] 13 : [13] ... (etc) 9 : [9] 8 : [8]
Note that the order doesn't matter. You just need to have all numbers between 0 and 19 included.
Multi-thread performances
We can now assess the advantage of having more than a single thread when the computation of the map or the reduce operation is expensive. To do this test, we have defined a new function computeSimilarity.
import time
class ReducerSimilarity:
def reduce(self,context):
pict1Idx = int(context.key);
iterator = context.iterator;
similarVec = [];
pict2 = iterator.getNext()
def computeSimilarity(pict1Idx,pict2Idx):
time.sleep(0.25); # waits 1/10 seconds
return (abs(pict1Idx-pict2Idx)<=1)
while pict2:
pict2 = int(pict2)
if(computeSimilarity(pict1Idx,pict2) == 1):
similarVec.append(pict2)
pict2 = iterator.getNext()
return similarVec
In this function, we simulate the fact that computing the similarity for a pair of pictures can take a non-negligeable amount of time. Also, to avoid really long total computation time, each chunk contains only one integer. Then, the following table shows the evolution of computation time in function of the number of threads.
Number of threads | Execution time [s] |
---|---|
1 | 52.69 |
2 | 27.14 |
3 | 19.18 |
5 | 10.93 |
10 | 5.67 |
15 | 5.17 |
20 | 5.21 |
We can clearly see the almost linear speedup at the beginning, followed by a stagnation. This is due to the fact that first we have a limited amount of cores available, and also that this simple algorithm have a pretty bad load balancing : the list of values for the first image is [1, 2, 3, ..., 20] while for the last one it is only [20] !
Hadoop and MapReduce
Hadoop (official webpage) is one of the most famous implementation of DFS (Distributed File System) and MapReduce. In this section, we provide a simple tutorial on how to install Hadoop and how to run some basic algorithms, like counting words or matrix-vector multiplication.
All the code used in this section can be downloaded here.
Prerequisites
To install and run simple Hadoop jobs, you will need
- A Linux or Mac OSX system (Windows doesn't seems to be fully supported yet)
- Java
Installing Hadoop
Hadoop can run:
- In Standalone mode: this is the simplest mode. It only uses one core of your computer, and doesn't takes advantage of the DFS. It is useful to debug and prototype algorithms.
- In Pseudo-distributed mode: this is the intermediate level. It uses the DFS and multiple cores, but still runs on one single computer.
- In Full-distributed mode: this is the more complex setup : it uses multiple computers (a cluster).
We will show you how to install and use the first mode.
You can find an official tutorial for the first two modes here.
Download and install
First download hadoop from one of the official mirrors.
Uncompress the archive and copy it in any folder of your computer. In this tutorial, we'll assume you copy it into
/Users/yourname/Hadoop/hadoop-2.5.2
(replace 2.5.2 by the version you downloaded)
For example, the path to the README.txt file is
/Users/yourname/Hadoop/hadoop-2.5.2/README.txt
Update environment variables
Now, we need to modify some environment variables. Open the file
/Users/yourname/Hadoop/hadoop-2.5.2/etc/hadoop/hadoop-env.sh
and add or modify the following line
# set to the root of your Java installation export JAVA_HOME=/library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home/ # Assuming your installation directory is /Users/yourname/Hadoop/hadoop-2.5.2 export HADOOP_PREFIX=/Users/yourname/Hadoop/hadoop-2.5.2
The JAVA_HOME line should be adapted to your system. It should points to the JAVA home directory (containing the LICENSE and COPYRIGHT files)
Hadoop is now installed. Open a terminal and navigate to
/Users/yourname/Hadoop/hadoop-2.5.2/
then type
bin/hadoop
It should display Hadoop's help.
Running MapReduce : the WordCount example
Hadoop is now installed, and you should be able to run basic MapReduce jobs using only one JAVA process.
Let's see how to run a basic WordCount example (the code of this example comes from both Hadoop official tutorials and here).
First, create the JAVA class. Simply download the following JAVA class and save it as
/Users/yourname/Hadoop/hadoop-2.5.2/WordCount.java
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class WordCount {
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
value.set(tokenizer.nextToken());
output.collect(value, new IntWritable(1));
}
}
}
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
In this folder, create a folder named
/Users/yourname/Hadoop/hadoop-2.5.2/input_wordcount
and fill it with text files you want to analyse. For example the Twitter data. You can have multiples files.
Then, open a terminal, navigate to the Hadoop folder, and enter the following commands.
First, update environment variables in terminal :
export JAVA_HOME=/library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home/ export PATH=$JAVA_HOME/bin:$PATH export HADOOP_CLASSPATH=$JAVA_HOME/lib/tools.jar
Then, compile the JAVA class,
bin/hadoop com.sun.tools.javac.Main WordCount.java
create the JAR file,
jar cf WordCount.jar WordCount*.class
and finally run Hadoop :
bin/hadoop jar WordCount.jar WordCount input_wordcount output_wordcount
At the end, the
output_wordcount
folder will contains two file. The first one, named
_SUCCESS
simply means everything went fine. The second one, named
part-00000
contains the output of MapReduce, namely <key, value> pairs. In this case, the keys are the word and value are the number of occurences.
WordCount in more details
Let's now analyse the WordCount example in more details.
The WordCount.java file consists of 3 classes : the mapper, the reducer and the main class. The main class is mainly here to define the classes of the <Key, Value> pairs, of the inputs and of the outputs. The other two classes are more interesting.
Mapper
The inputs of the map function consist of 4 arguments. The first one is the key, followed by the value, then by the output and by some reporter. Exception made of the last one, this is exactly the regular Map-Reduce standard : the mapper takes one <Key, Value> as input, and output zero, one or more <Key, Value> in the output.
Basically, in this case, the value is a line from a text file, while the key is irrelevant. First, we break the line into words using the StringTokenizer classes. Then, while the line contains words, we set value to the current word, and update the output by
output.collect(value, new IntWritable(1));
This is the classical <Word, 1> key-value pair of the WordCount example.
Reducer
The code of the reducer is even simpler.
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
Basically, we iterate over the elements of values (with the hasNext() and next() method), and we simply increment sum by the values. At the end, we return the <Key, Sum> where Key is the same key as in the input (the word), and Sum is the sum of the values in the values list.
A few results with Twitter data
Now that we know how to use Map-Reduce and Hadoop, we're ready to tackle some big problems.
The first one - maybe the easiest - would be to count for each word the number of occurrences in the tweets. To do that, the first step is to extract the tweets content. Then, we can simply run the WordCount algorithm on it. Finally, we can sort the words by the number of occurrences to extract the most frequent words.
Word | Occurences | Word | Occurences | Word | Occurences | Word | Occurences | Word | Occurences |
---|---|---|---|---|---|---|---|---|---|
de | 1420912 | je | 931274 | ???? | 778256 | a | 676432 | la | 673392 |
pas | 632459 | en | 592281 | le | 532160 | que | 500950 | me | 493155 |
! | 473470 | et | 401930 | les | 383593 | at | 377620 | c'est | 375132 |
I'm | 370016 | un | 366329 | ? | 334248 | Je | 317364 | à | 301413 |
in | 289383 | ik | 288244 | is | 282895 | pour | 270614 | des | 257046 |
tu | 251836 | on | 251521 | mon | 245422 | est | 243473 | ma | 241448 |
une | 238427 | trop | 235215 | ça | 234347 | j'ai | 232933 | il | 230798 |
qui | 228101 | ???????? | 226551 | een | 222578 | van | 212019 | te | 206896 |
avec | 202559 | mais | 202257 | moi | 198920 | plus | 196125 | ce | 191429 |
dans | 189510 | the | 185812 | suis | 185590 | fait | 183121 | ?? | 180375 |
du | 178083 | au | 174338 | elle | 173394 | J'ai | 167742 | met | 166630 |
het | 165778 | I | 163449 | (@ | 160016 | op | 157904 | sur | 154954 |
you | 153844 | va | 149515 | to | 149449 | bien | 145425 | - | 144929 |
w/ | 144396 | tout | 140681 | vais | 136479 | faire | 133084 | @ | 132406 |
voor | 131484 | dat | 124911 | sa | 122852 | se | 122303 | niet | 120467 |
die | 118920 | Ik | 116627 | :) | 111440 | même | 110788 | of | 110710 |
comme | 110279 | quand | 106859 | , | 106765 | si | 104488 | ne | 103289 |
toi | 99144 | y | 98639 | ?????? | 98236 | ???????????? | 97659 | C'est | 94099 |
maar | 93226 | mes | 93127 | 2 | 91878 | dit | 90925 | vous | 90547 |
my | 88188 | nog | 87546 | and | 86785 | echt | 84366 | mdr | 83533 |
Extension of WordCount
To avoid some of the strange results in the previous table (?????? or :) for example) we can add some criterion about the word we select directly inside the mapper. The mapper can also take care of some "conversion" (like putting all words to lower case, etc). The code of the map fonction then become slightly more complicated
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
String word = tokenizer.nextToken() ;
// Remove accent
word = deAccent(word) ;
// Set to lower case and remove punctuation
word = word.replaceAll("[^a-zA-Z0-9'#]", "") ;
//System.out.println("From " + word1 + " to " + word) ;
if (word.length() > 0) {
word = word.toLowerCase() ;
value.set(word);
output.collect(value, new IntWritable(1));
}
}
}
where deAccent is function removing accents.
In this case, results are more regular :
Word | Occurences | Word | Occurences | Word | Occurences | Word | Occurences | Word | Occurences |
---|---|---|---|---|---|---|---|---|---|
de | 1511822 | je | 1276007 | a | 1047597 | la | 823707 | pas | 685017 |
en | 680956 | le | 620439 | me | 538259 | que | 518545 | c'est | 477853 |
et | 468833 | les | 467209 | ik | 416741 | j'ai | 406732 | un | 399836 |
at | 394207 | i'm | 379167 | ca | 349692 | on | 333387 | in | 316186 |
Now, let's imagine we only want to count words with more than 10 letters, beginning with an a. How can we do that using Map-Reduce ? Of course, we could use the previous algorithm, and then extract the data we care about. But let's be smarter : we can modify the map function directly. The code will then be
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
String word = tokenizer.nextToken() ;
if (word.length() >= 10 && word.charAt(0) == 'a') {
value.set(word);
output.collect(value, new IntWritable(1));
}
}
}
And the results are
Word | Occurences | Word | Occurences | Word | Occurences | Word | Occurences | Word | Occurences |
---|---|---|---|---|---|---|---|---|---|
aujourd'hui | 19942 | anniversaire | 18576 | après-midi | 4547 | afgehandeld | 2465 | absolument | 1941 |
automatically | 1863 | aujourd'hui, | 1640 | apparemment | 1398 | aflevering | 1330 | aujourd'hui | 1309 |
actuellement | 965 | antwoorden | 818 | ahahahahah | 683 | abonnement | 628 | aangesproken | 578 |
achtergrond | 554 | anniversaire, | 531 | aujourd'hui.. | 497 | afgesloten | 492 | aujourd'hui! | 484 |
Finally, if we only want to extract useful words (defined as words with more than 8 characters and beginning with an actual letter), we need to have this mapper
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
String word = tokenizer.nextToken() ;
if (word.length() >= 8 && Character.isLetter(word.charAt(0))) {
value.set(word);
output.collect(value, new IntWritable(1));
}
}
}
In this case, results are the following
Word | Occurrences | Word | Occurrences | Word | Occurrences | Word | Occurrences | Word | Occurrences |
---|---|---|---|---|---|---|---|---|---|
vraiment | 47152 | toujours | 44705 | tellement | 31380 | personne | 27850 | West-Vlaanderen | 21341 |
Oost-Vlaanderen | 20516 | aujourd'hui | 19942 | maintenant | 19751 | anniversaire | 18576 | commence | 18395 |
vacances | 18353 | regarder | 17572 | pourquoi | 17107 | Antwerpen | 17091 | richting | 14850 |
meilleur | 12063 | Brussels | 11929 | hahahaha | 11855 | Pourquoi | 11774 | Vlaams-Brabant | 11609 |
This seems interesting: there is a very high number of occurences for the names of the flemish provinces in Belgium. This might of course be due to the fact that there are more Dutch-speaking people than French-speaking people. Still, they appear so many times that this result might seems counter-intuitive at first sight.
The Matrix-Vector Multiplication using Hadoop
Another example of computation that can be done with Map-Reduce is the Matrix-Vector multiplication.
In this case, we want to compute
The Map-Reduce algorithm is then the following.
- Map: the input is an entry of the matrix A stored as . The output is the following key-value pair :
- Reduce: the input is a key-values pair and the output is
The Java code for this MapReduce is the following
import java.io.IOException;
import java.io.File;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Iterator;
import java.util.StringTokenizer;
import java.util.Scanner;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class SparseMatrixVector {
private static double[] b ; // The vector
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, IntWritable, DoubleWritable> {
@Override
public void map(LongWritable key, Text value, OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
throws IOException {
String line = value.toString(); // (i, j, A_ij)
String[] IJAij = line.split(" "); // Divide string into (at least) 3 elements : i, j, and A_ij
int i = Integer.parseInt(IJAij[0]) ;
int j = Integer.parseInt(IJAij[1]) ;
double Aij = Double.parseDouble(IJAij[2]) ;
output.collect(new IntWritable(i), new DoubleWritable(Aij * b[j-1])); // Output (i, A_ij b_j) CAUTION : j-1 because of Matlab indexing
}
}
public static class Reduce extends MapReduceBase implements
Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> {
@Override
public void reduce(IntWritable key, Iterator<DoubleWritable> values,
OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
throws IOException {
double sum = 0.0 ;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new DoubleWritable(sum)); // Output (i, SUM_j A_ij b_j)
}
}
public static void main(String[] args) throws Exception {
/* Display arguments */
System.out.println("====================================") ;
System.out.println("b vector path : " + args[0]) ;
System.out.println("b vector size : " + args[1]) ;
System.out.println("A matrix folder path : " + args[2]) ;
System.out.println("Ax vector folder path : " + args[3]) ;
System.out.println("====================================") ;
/* 1) Create the b vector : every map can access it in memory */
int n = Integer.parseInt(args[1]) ;
b = new double[n] ;
int i = 0 ;
Path b_vector_path = new Path(args[0]) ;
FileSystem fs = FileSystem.get(new Configuration());
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(b_vector_path)));
String line;
line = br.readLine();
while (line != null){
if (i >= n) {
System.out.println("i out of bounds !") ;
}
b[i] = Double.parseDouble(line);
i ++ ;
line = br.readLine();
}
/* 2) Run MapReduce to compute x = A*b */
JobConf conf = new JobConf(SparseMatrixVector.class);
conf.setJobName("sparseMatrixVector");
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(DoubleWritable.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[2]));
FileOutputFormat.setOutputPath(conf, new Path(args[3]));
JobClient.runJob(conf);
}
}
This code can then be compiled and run by typing
bin/hadoop com.sun.tools.javac.Main SparseMatrixVector.java jar cf SparseMatrixVector.jar SparseMatrixVector*.class bin/hadoop jar SparseMatrixVector.jar SparseMatrixVector b2_vector 200000 A/ output_sparseMatrixVector/
Where
b2_vector
is the path to the vector,
200000
is the size of the vector,
A/
is the path to the folder containing the matrix (which can be divided into small files) and
output_sparseMatrixVector/
is the folder where the results should be written.
Performances
We can now try to assess the performances of MapReduce on this Matrix-Vector multiplication. In this example, we used sparse tridiagonal matrices.
Matrix size | Execution time [s] |
---|---|
100 | 1.8 |
1 000 | 1.8 |
10 000 | 1.9 |
100 000 | 3.7 |
1 000 000 | 9.6 |
10 000 000 | 85 |
We see the the complexity is asymptotically linear, but at the beginning the algorithm isn't effective at all. This shows that Hadoop is useful for tacking large problems, but shouldn't be used when solving small instances problems.