Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce, mail # user - Change the output of Reduce function


Copy link to this message
-
Change the output of Reduce function
Felipe Gutierrez 2013-07-25, 18:58
I did a MapReduce program to execute a Grep function. I know there is a
Grep function at hadoop examples, but I want to make my Grep MapReduce to
explain to other.
My problem is that my out put shows the key/value. I want to show only the
value, since I saved the line number at this value. Example:

00048 [ line 6298 : Jul 25 15:18:14 felipe kernel: [ 2168.644689] wlan0:
associated ]

Here is my code. Thanks,
Felipe

package grep;

import java.io.File;
import java.io.FileReader;
import java.io.LineNumberReader;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class Main {

public static void main(String[] args) throws Exception {

if (args == null || args.length != 3) {
System.err.println("Usage: Main <in> <out> <regex>");
System.exit(-1);
}

JobConf conf = new JobConf(Main.class);
conf.setJobName("grep");

String input = args[0];
String output = args[1];
String regex = args[2];

File arquivoLeitura = new File(input);
LineNumberReader linhaLeitura = new LineNumberReader(new FileReader(
arquivoLeitura));
linhaLeitura.skip(arquivoLeitura.length());
String lines = String.valueOf(linhaLeitura.getLineNumber() + 1);
conf.set("grep.regex", regex);
conf.set("grep.lines", lines);

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(MyWritable.class);

conf.setMapperClass(GrepMapper.class);
conf.setCombinerClass(GrepReducer.class);
conf.setReducerClass(GrepReducer.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(input));
FileOutputFormat.setOutputPath(conf, new Path(output));

JobClient.runJob(conf);
}
}

package grep;

import java.io.IOException;
import java.text.DecimalFormat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class GrepMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, MyWritable> {

private static long line = 1;
private static long n = 0;
private static long divisor = 1;
private static long qtdLines = 0;
private Text k = new Text();

public void map(LongWritable key, Text value,
OutputCollector<Text, MyWritable> output, Reporter reporter)
throws IOException {
String str = value.toString();
MyWritable text = new MyWritable("line " + line + " : " + str);
if ((line % divisor) == 0) {
n++;
}
k.set(customFormat("00000", n));
output.collect(k, text);
line++;
}

@Override
public void configure(JobConf job) {
qtdLines = Long.parseLong(job.get("grep.lines"));
if (qtdLines <= 500) {
divisor = 10;
} else if (qtdLines <= 1000) {
divisor = 20;
} else if (qtdLines <= 1500) {
divisor = 30;
} else if (qtdLines <= 2000) {
divisor = 40;
} else if (qtdLines <= 2500) {
divisor = 50;
} else if (qtdLines <= 3000) {
divisor = 60;
} else if (qtdLines <= 3500) {
divisor = 70;
} else if (qtdLines <= 4000) {
divisor = 80;
} else if (qtdLines <= 4500) {
divisor = 90;
} else if (qtdLines <= 5000) {
divisor = 100;
} else if (qtdLines <= 5500) {
divisor = 110;
} else if (qtdLines <= 6000) {
divisor = 120;
} else if (qtdLines <= 6500) {
divisor = 130;
} else if (qtdLines <= 7000) {
divisor = 140;
}
}

static public String customFormat(String pattern, double value) {
DecimalFormat myFormatter = new DecimalFormat(pattern);
return myFormatter.format(value);
}
}

package grep;

import java.io.IOException;
import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class GrepReducer extends MapReduceBase implements
Reducer<Text, MyWritable, Text, MyWritable> {

private Pattern pattern;

@Override
public void configure(JobConf job) {
pattern = Pattern.compile(job.get("grep.regex"));
}

public void reduce(Text key, Iterator<MyWritable> values,
OutputCollector<Text, MyWritable> output, Reporter reporter)
throws IOException {

while (values.hasNext()) {
String text = (String) values.next().get();
Matcher matcher = pattern.matcher(text);
while (matcher.find()) {
output.collect(key, new MyWritable(text));
}
}
}
}

package grep;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;

public class MyWritable implements Writable, Configurable {

private Class declaredClass;
private Object instance;
private Configuration conf;

public MyWritable() {
}

public MyWritable(Object instance) {
set(instance);
}

public MyWritable(Class declaredClass, Object instance) {
this.declaredClass = declaredClass;
this.instance = instance;
}

/** Return the instance, or null if none. */
public Object get() {
return instance;
}

/** Return the class this is meant to be. */
public Class getDeclaredClass() {
return declaredClass;
}

/** Reset the instance. */
public void set(Object instance) {
this.declaredClass = instance.getClass();
this.instance = instance;
}

public String toString() {
return "[ " + instance + " ]";
}

public void readFields(DataInput in) throws IOException {
readObject(in, this, this.conf);
}

public voi