Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Hadoop, mail # user - map phase does not read intermediate results with SequenceFileInputFormat


Copy link to this message
-
map phase does not read intermediate results with SequenceFileInputFormat
Anseh Danesh 2013-10-25, 05:46
Hi all.

I have a mapreduce program with two jobs. second job's key and value comes
from first job output. but I think the second map does not get the result
from first job. in other words I think my second job did not read the
output of my first job.. what should I do?

here is the code:

public class dewpoint extends Configured implements Tool
{
  private static final Logger logger = LoggerFactory.getLogger(dewpoint.class);

static final String KEYSPACE = "weather";
static final String COLUMN_FAMILY = "user";
private static final String OUTPUT_PATH1 = "/tmp/intermediate1";
private static final String OUTPUT_PATH2 = "/tmp/intermediate2";
private static final String OUTPUT_PATH3 = "/tmp/intermediate3";
private static final String INPUT_PATH1 = "/tmp/intermediate1";

public static void main(String[] args) throws Exception
{

    ToolRunner.run(new Configuration(), new dewpoint(), args);
    System.exit(0);
}

///////////////////////////////////////////////////////////

public static class dpmap1 extends Mapper<Map<String, ByteBuffer>,
Map<FloatWritable, ByteBuffer>, Text, DoubleWritable>
{
    DoubleWritable val1 = new DoubleWritable();
    Text word = new Text();
    String date;
    float temp;
    public void map(Map<String, ByteBuffer> keys, Map<FloatWritable,
ByteBuffer> columns, Context context) throws IOException,
InterruptedException
    {

         for (Entry<String, ByteBuffer> key : keys.entrySet())
         {
             //System.out.println(key.getKey());
             if (!"date".equals(key.getKey()))
                 continue;
             date = ByteBufferUtil.string(key.getValue());
             word.set(date);
         }
        for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet())
        {
            if (!"temprature".equals(column.getKey()))
                continue;
            temp = ByteBufferUtil.toFloat(column.getValue());
            val1.set(temp);
            //System.out.println(temp);
       }
        context.write(word, val1);
    }
}

///////////////////////////////////////////////////////////

public static class dpred1 extends Reducer<Text, DoubleWritable, Text,
DoubleWritable>
{
   public void reduce(Text key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException
    {
        double beta = 17.62;
        double landa = 243.12;
        DoubleWritable result1 = new DoubleWritable();
        DoubleWritable result2 = new DoubleWritable();
         for (DoubleWritable val : values){
         //  System.out.println(val.get());
           beta *= val.get();
           landa+=val.get();
           }
         result1.set(beta);
         result2.set(landa);

         context.write(key, result1);
         context.write(key, result2);
     }
}
///////////////////////////////////////////////////////////

public static class dpmap2 extends Mapper <Text, DoubleWritable, Text,
DoubleWritable>{

    Text key2 = new Text();
    double temp1, temp2 =0;

    public void map(Text key, Iterable<DoubleWritable> values, Context
context) throws IOException, InterruptedException {
        String[] sp = values.toString().split("\t");
        for (int i=0; i< sp.length; i+=4)
            //key2.set(sp[i]);
        System.out.println(sp[i]);
            for(int j=1;j< sp.length; j+=4)
                temp1 = Double.valueOf(sp[j]);
                for (int k=3;k< sp.length; k+=4)
                    temp2 = Double.valueOf(sp[k]);
        context.write(key2, new DoubleWritable(temp2/temp1));

    }
}

///////////////////////////////////////////////////////////
public static class dpred2 extends Reducer<Text, DoubleWritable, Text,
DoubleWritable>
{
   public void reduce(Text key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException
    {

       double alpha = 6.112;
        double tmp = 0;
        DoubleWritable result3 = new DoubleWritable();
         for (DoubleWritable val : values){
             System.out.println(val.get());
             tmp = alpha*(Math.pow(Math.E, val.get()));

         }
         result3.set(tmp);
         context.write(key, result3);
  }
}
///////////////////////////////////////////////////////////
public int run(String[] args) throws Exception
{

     Job job1 = new Job(getConf(), "DewPoint");
     job1.setJarByClass(dewpoint.class);
     job1.setMapperClass(dpmap1.class);
     job1.setOutputFormatClass(SequenceFileOutputFormat.class);
     job1.setCombinerClass(dpred1.class);
     job1.setReducerClass(dpred1.class);
     job1.setMapOutputKeyClass(Text.class);
     job1.setMapOutputValueClass(DoubleWritable.class);
     job1.setOutputKeyClass(Text.class);
     job1.setOutputValueClass(DoubleWritable.class);
     FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1));
     job1.setInputFormatClass(CqlPagingInputFormat.class);

     ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160");
     ConfigHelper.setInputInitialAddress(job1.getConfiguration(), "localhost");
     ConfigHelper.setInputColumnFamily(job1.getConfiguration(),
KEYSPACE, COLUMN_FAMILY);
     ConfigHelper.setInputPartitioner(job1.getConfiguration(),
"Murmur3Partitioner");

     CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3");
     job1.waitForCompletion(true);

     /***************************************/

     if (job1.isSuccessful()){
     Job job2 = new Job(getConf(), "DewPoint");
     job2.setJarByClass(dewpoint.class);
     job2.setMapperClass(dpmap2.class);
     job2.setCombinerClass(dpred2.class);
     job2.setReducerClass(dpred2.class);
     job2.setMapOutputKeyClass(Text.class);
     job2.setMapOutputValueClass(DoubleWritable.class);
     job2.setOutputKeyClass(Text.class);
     job2.setOutputValueClass(DoubleWritable.class);
     job2.setOutputFormatClass(TextOutputFormat.class);
     job2.setInputFormatClass(SequenceFileInputFormat.class);
     FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1));
     FileOutputFormat.setOutputPath(job2,
+
Robin East 2013-10-25, 10:03