Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce >> mail # user >> muti-thread mapreduce


Copy link to this message
-
muti-thread mapreduce
Dears,

I suddenly got this idea to do mapreduce job in a muti-thread way.
I don't know if it can work. Could anyone give me some advices?
Here is the java code:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class LogProcessApp extends Thread {

 private static String sep;
 private String x2;
 private String x3;

 public LogProcessApp(String arg1,String arg2,String arg3){
  sep=arg1;
  x2=arg2;
  x3=arg3;
 }

 public static class CM extends Mapper<LongWritable, Text, Text, Text>{
  private Text keyvar = new Text();
  private Text valvar = new Text();
  public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
   String line = value.toString();
   try{
    String data[] = line.split(sep);
    keyvar.set(data[0]);
    valvar.set(data[1]);
    context.write(keyvar,valvar);
   } catch (Exception e) {
    return;
   }
  }
 }

 public void run(){
  Configuration conf = new Configuration();
  Job job = null;
  try {
   job = new Job(conf);
  } catch (IOException e1) {
   // TODO Auto-generated catch block
   e1.printStackTrace();
  }

  job.setJobName("XXXJob");
  job.setJarByClass(CMR.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  job.setMapperClass(CM.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  try {
   FileInputFormat.addInputPath(job, new Path(x2));
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  FileOutputFormat.setOutputPath(job, new Path(x3));

  try {
   job.submit();
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } catch (ClassNotFoundException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }

 }
 public static void main(String args[]){
  LogProcessApp lpa1=new LogProcessApp(args[0],args[1],args[3]);
  LogProcessApp lpa2=new LogProcessApp(args[4],args[5],args[6]);
  LogProcessApp lpa3=new LogProcessApp(args[7],args[8],args[9]);
  lpa1.start();
  lpa2.start();
  lpa3.start();
 }
}
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB