Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Hadoop >> mail # user >> map phase does not read intermediate results with SequenceFileInputFormat


Copy link to this message
-
map phase does not read intermediate results with SequenceFileInputFormat
Hi all.

I have a mapreduce program with two jobs. second job's key and value comes
from first job output. but I think the second map does not get the result
from first job. in other words I think my second job did not read the
output of my first job.. what should I do?

here is the code:

public class dewpoint extends Configured implements Tool
{
  private static final Logger logger = LoggerFactory.getLogger(dewpoint.class);

static final String KEYSPACE = "weather";
static final String COLUMN_FAMILY = "user";
private static final String OUTPUT_PATH1 = "/tmp/intermediate1";
private static final String OUTPUT_PATH2 = "/tmp/intermediate2";
private static final String OUTPUT_PATH3 = "/tmp/intermediate3";
private static final String INPUT_PATH1 = "/tmp/intermediate1";

public static void main(String[] args) throws Exception
{

    ToolRunner.run(new Configuration(), new dewpoint(), args);
    System.exit(0);
}

///////////////////////////////////////////////////////////

public static class dpmap1 extends Mapper<Map<String, ByteBuffer>,
Map<FloatWritable, ByteBuffer>, Text, DoubleWritable>
{
    DoubleWritable val1 = new DoubleWritable();
    Text word = new Text();
    String date;
    float temp;
    public void map(Map<String, ByteBuffer> keys, Map<FloatWritable,
ByteBuffer> columns, Context context) throws IOException,
InterruptedException
    {

         for (Entry<String, ByteBuffer> key : keys.entrySet())
         {
             //System.out.println(key.getKey());
             if (!"date".equals(key.getKey()))
                 continue;
             date = ByteBufferUtil.string(key.getValue());
             word.set(date);
         }
        for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet())
        {
            if (!"temprature".equals(column.getKey()))
                continue;
            temp = ByteBufferUtil.toFloat(column.getValue());
            val1.set(temp);
            //System.out.println(temp);
       }
        context.write(word, val1);
    }
}

///////////////////////////////////////////////////////////

public static class dpred1 extends Reducer<Text, DoubleWritable, Text,
DoubleWritable>
{
   public void reduce(Text key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException
    {
        double beta = 17.62;
        double landa = 243.12;
        DoubleWritable result1 = new DoubleWritable();
        DoubleWritable result2 = new DoubleWritable();
         for (DoubleWritable val : values){
         //  System.out.println(val.get());
           beta *= val.get();
           landa+=val.get();
           }
         result1.set(beta);
         result2.set(landa);

         context.write(key, result1);
         context.write(key, result2);
     }
}
///////////////////////////////////////////////////////////

public static class dpmap2 extends Mapper <Text, DoubleWritable, Text,
DoubleWritable>{

    Text key2 = new Text();
    double temp1, temp2 =0;

    public void map(Text key, Iterable<DoubleWritable> values, Context
context) throws IOException, InterruptedException {
        String[] sp = values.toString().split("\t");
        for (int i=0; i< sp.length; i+=4)
            //key2.set(sp[i]);
        System.out.println(sp[i]);
            for(int j=1;j< sp.length; j+=4)
                temp1 = Double.valueOf(sp[j]);
                for (int k=3;k< sp.length; k+=4)
                    temp2 = Double.valueOf(sp[k]);
        context.write(key2, new DoubleWritable(temp2/temp1));

    }
}

///////////////////////////////////////////////////////////
public static class dpred2 extends Reducer<Text, DoubleWritable, Text,
DoubleWritable>
{
   public void reduce(Text key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException
    {

       double alpha = 6.112;
        double tmp = 0;
        DoubleWritable result3 = new DoubleWritable();
         for (DoubleWritable val : values){
             System.out.println(val.get());
             tmp = alpha*(Math.pow(Math.E, val.get()));

         }
         result3.set(tmp);
         context.write(key, result3);
  }
}
///////////////////////////////////////////////////////////
public int run(String[] args) throws Exception
{

     Job job1 = new Job(getConf(), "DewPoint");
     job1.setJarByClass(dewpoint.class);
     job1.setMapperClass(dpmap1.class);
     job1.setOutputFormatClass(SequenceFileOutputFormat.class);
     job1.setCombinerClass(dpred1.class);
     job1.setReducerClass(dpred1.class);
     job1.setMapOutputKeyClass(Text.class);
     job1.setMapOutputValueClass(DoubleWritable.class);
     job1.setOutputKeyClass(Text.class);
     job1.setOutputValueClass(DoubleWritable.class);
     FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1));
     job1.setInputFormatClass(CqlPagingInputFormat.class);

     ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160");
     ConfigHelper.setInputInitialAddress(job1.getConfiguration(), "localhost");
     ConfigHelper.setInputColumnFamily(job1.getConfiguration(),
KEYSPACE, COLUMN_FAMILY);
     ConfigHelper.setInputPartitioner(job1.getConfiguration(),
"Murmur3Partitioner");

     CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3");
     job1.waitForCompletion(true);

     /***************************************/

     if (job1.isSuccessful()){
     Job job2 = new Job(getConf(), "DewPoint");
     job2.setJarByClass(dewpoint.class);
     job2.setMapperClass(dpmap2.class);
     job2.setCombinerClass(dpred2.class);
     job2.setReducerClass(dpred2.class);
     job2.setMapOutputKeyClass(Text.class);
     job2.setMapOutputValueClass(DoubleWritable.class);
     job2.setOutputKeyClass(Text.class);
     job2.setOutputValueClass(DoubleWritable.class);
     job2.setOutputFormatClass(TextOutputFormat.class);
     job2.setInputFormatClass(SequenceFileInputFormat.class);
     FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1));
     FileOutputFormat.setOutputPath(job2,
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB