-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathInvertedIndex.java~
More file actions
92 lines (80 loc) · 3.43 KB
/
Copy pathInvertedIndex.java~
File metadata and controls
92 lines (80 loc) · 3.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class InvertedIndex {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
JobConf conf;
public void configure (JobConf job){
this.conf = job;
}
public void map (LongWritable docId, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
//retrieve # keywords from JobConf
int argc = Integer.parseInt (conf.get ("argc"));
//get the current file name
FileSplit fileSplit = (FileSplit) reporter.getInputSplit();
String filename = "" + fileSplit.getPath().getName();
//read each line of file split
String line = value.toString();
//tokenize each word with a space
StringTokenizer tokenizer = new StringTokenizer(line);
//check if it is one of the given keywords
HashMap<String, Integer> keywords = new HashMap<String, Integer>();
int count = 0;
for(int i=0; i<argc; i++){
keywords.put(conf.get("keyword"+i), count);
}
String token = new String();
int sum = 0;
while (tokenizer.hasMoreTokens()) {
token = tokenizer.nextToken();
if(keywords.containsKey(token)){
output.collect(new Text(token), new Text(filename));
}
}
//generate and pass pairs of a keyword and a document id to Reduce
}
}
//receive one of the keywords and all document ids
public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
private Text result=new Text();
private Text value = new Text();
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
HashMap m = new HashMap();
int count = 0;
while(values.hasNext()){
String str = values.next().toString();
if((m!=null)&&(m.get(str)!=null)){
count = (int)m.get(str);
m.put(str, ++count);
} else {
// m.put(str, 1);
}
}
output.collect(key, new Text(m.toString()));
}
}
public static void main(String[] args) throws Exception {
//input format:
//hadoop jar invertedindexes.jar InvertedIndexes input output keyword1 keyword2 ...
JobConf conf = new JobConf(InvertedIndex.class);
conf.setJobName("InvertedIndex");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.set("argc", String.valueOf(args.length-2));
for(int i=0; i<args.length-2;i++){
conf.set("keyword"+i, args[i+2]);
}
JobClient.runJob(conf);
}
}