Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
MapReduce >> mail # user >> Change the output of Reduce function


Copy link to this message
-
Change the output of Reduce function
I did a MapReduce program to execute a Grep function. I know there is a
Grep function at hadoop examples, but I want to make my Grep MapReduce to
explain to other.
My problem is that my out put shows the key/value. I want to show only the
value, since I saved the line number at this value. Example:

00048 [ line 6298 : Jul 25 15:18:14 felipe kernel: [ 2168.644689] wlan0:
associated ]

Here is my code. Thanks,
Felipe

package grep;

import java.io.File;
import java.io.FileReader;
import java.io.LineNumberReader;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class Main {

public static void main(String[] args) throws Exception {

if (args == null || args.length != 3) {
System.err.println("Usage: Main <in> <out> <regex>");
System.exit(-1);
}

JobConf conf = new JobConf(Main.class);
conf.setJobName("grep");

String input = args[0];
String output = args[1];
String regex = args[2];

File arquivoLeitura = new File(input);
LineNumberReader linhaLeitura = new LineNumberReader(new FileReader(
arquivoLeitura));
linhaLeitura.skip(arquivoLeitura.length());
String lines = String.valueOf(linhaLeitura.getLineNumber() + 1);
conf.set("grep.regex", regex);
conf.set("grep.lines", lines);

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(MyWritable.class);

conf.setMapperClass(GrepMapper.class);
conf.setCombinerClass(GrepReducer.class);
conf.setReducerClass(GrepReducer.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(input));
FileOutputFormat.setOutputPath(conf, new Path(output));

JobClient.runJob(conf);
}
}

package grep;

import java.io.IOException;
import java.text.DecimalFormat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class GrepMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, MyWritable> {

private static long line = 1;
private static long n = 0;
private static long divisor = 1;
private static long qtdLines = 0;
private Text k = new Text();

public void map(LongWritable key, Text value,
OutputCollector<Text, MyWritable> output, Reporter reporter)
throws IOException {
String str = value.toString();
MyWritable text = new MyWritable("line " + line + " : " + str);
if ((line % divisor) == 0) {
n++;
}
k.set(customFormat("00000", n));
output.collect(k, text);
line++;
}

@Override
public void configure(JobConf job) {
qtdLines = Long.parseLong(job.get("grep.lines"));
if (qtdLines <= 500) {
divisor = 10;
} else if (qtdLines <= 1000) {
divisor = 20;
} else if (qtdLines <= 1500) {
divisor = 30;
} else if (qtdLines <= 2000) {
divisor = 40;
} else if (qtdLines <= 2500) {
divisor = 50;
} else if (qtdLines <= 3000) {
divisor = 60;
} else if (qtdLines <= 3500) {
divisor = 70;
} else if (qtdLines <= 4000) {
divisor = 80;
} else if (qtdLines <= 4500) {
divisor = 90;
} else if (qtdLines <= 5000) {
divisor = 100;
} else if (qtdLines <= 5500) {
divisor = 110;
} else if (qtdLines <= 6000) {
divisor = 120;
} else if (qtdLines <= 6500) {
divisor = 130;
} else if (qtdLines <= 7000) {
divisor = 140;
}
}

static public String customFormat(String pattern, double value) {
DecimalFormat myFormatter = new DecimalFormat(pattern);
return myFormatter.format(value);
}
}

package grep;

import java.io.IOException;
import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class GrepReducer extends MapReduceBase implements
Reducer<Text, MyWritable, Text, MyWritable> {

private Pattern pattern;

@Override
public void configure(JobConf job) {
pattern = Pattern.compile(job.get("grep.regex"));
}

public void reduce(Text key, Iterator<MyWritable> values,
OutputCollector<Text, MyWritable> output, Reporter reporter)
throws IOException {

while (values.hasNext()) {
String text = (String) values.next().get();
Matcher matcher = pattern.matcher(text);
while (matcher.find()) {
output.collect(key, new MyWritable(text));
}
}
}
}

package grep;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;

public class MyWritable implements Writable, Configurable {

private Class declaredClass;
private Object instance;
private Configuration conf;

public MyWritable() {
}

public MyWritable(Object instance) {
set(instance);
}

public MyWritable(Class declaredClass, Object instance) {
this.declaredClass = declaredClass;
this.instance = instance;
}

/** Return the instance, or null if none. */
public Object get() {
return instance;
}

/** Return the class this is meant to be. */
public Class getDeclaredClass() {
return declaredClass;
}

/** Reset the instance. */
public void set(Object instance) {
this.declaredClass = instance.getClass();
this.instance = instance;
}

public String toString() {
return "[ " + instance + " ]";
}

public void readFields(DataInput in) throws IOException {
readObject(in, this, this.conf);
}

public voi
+
Shahab Yunus 2013-07-25, 19:07
+
Felipe Gutierrez 2013-07-25, 19:18
+
Mohammad Tariq 2013-07-25, 19:26
+
Mohammad Tariq 2013-07-25, 19:27
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB