Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce, mail # user - muti-thread mapreduce


Copy link to this message
-
muti-thread mapreduce
Yu Yang 2012-12-12, 10:09
Dears,

I suddenly got this idea to do mapreduce job in a muti-thread way.
I don't know if it can work. Could anyone give me some advices?
Here is the java code:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class LogProcessApp extends Thread {

 private static String sep;
 private String x2;
 private String x3;

 public LogProcessApp(String arg1,String arg2,String arg3){
  sep=arg1;
  x2=arg2;
  x3=arg3;
 }

 public static class CM extends Mapper<LongWritable, Text, Text, Text>{
  private Text keyvar = new Text();
  private Text valvar = new Text();
  public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
   String line = value.toString();
   try{
    String data[] = line.split(sep);
    keyvar.set(data[0]);
    valvar.set(data[1]);
    context.write(keyvar,valvar);
   } catch (Exception e) {
    return;
   }
  }
 }

 public void run(){
  Configuration conf = new Configuration();
  Job job = null;
  try {
   job = new Job(conf);
  } catch (IOException e1) {
   // TODO Auto-generated catch block
   e1.printStackTrace();
  }

  job.setJobName("XXXJob");
  job.setJarByClass(CMR.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  job.setMapperClass(CM.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  try {
   FileInputFormat.addInputPath(job, new Path(x2));
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  FileOutputFormat.setOutputPath(job, new Path(x3));

  try {
   job.submit();
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } catch (ClassNotFoundException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }

 }
 public static void main(String args[]){
  LogProcessApp lpa1=new LogProcessApp(args[0],args[1],args[3]);
  LogProcessApp lpa2=new LogProcessApp(args[4],args[5],args[6]);
  LogProcessApp lpa3=new LogProcessApp(args[7],args[8],args[9]);
  lpa1.start();
  lpa2.start();
  lpa3.start();
 }
}