Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
HDFS, mail # user - Re: Issue with Reduce Side join using datajoin package


Copy link to this message
-
Re: Issue with Reduce Side join using datajoin package
Vikas Jadhav 2013-01-31, 17:44
***************source ****************

public class MyJoin extends Configured implements Tool {

    public static class MapClass extends DataJoinMapperBase {

        protected Text generateInputTag(String inputFile) {
            System.out.println("Starting generateInputTag() : "+inputFile);
            String datasource = inputFile.split("-")[0];
            return new Text(datasource);
        }

        protected Text generateGroupKey(TaggedMapOutput aRecord) {
            System.out.println(" Statring generateGroupKey() : "+aRecord);
            String line = ((Text) aRecord.getData()).toString();
            String[] tokens = line.split(",");
            String groupKey = tokens[0];
            return new Text(groupKey);
        }

        protected TaggedMapOutput generateTaggedMapOutput(Object value) {
            System.out.println("starting  generateTaggedMapOutput() value
: "+value);
            TaggedWritable retv = new TaggedWritable((Text) value);
            retv.setTag(this.inputTag);
            return retv;
        }
    }

    public static class Reduce extends DataJoinReducerBase {

        protected TaggedMapOutput combine(Object[] tags, Object[] values) {
             System.out.println("combine :");
            if (tags.length < 2) return null;
            String joinedStr = "";
            for (int i=0; i<values.length; i++) {
                if (i > 0) joinedStr += ",";
                TaggedWritable tw = (TaggedWritable) values[i];
                String line = ((Text) tw.getData()).toString();
                String[] tokens = line.split(",", 2);
                joinedStr += tokens[1];
            }
            TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
            retv.setTag((Text) tags[0]);
            return retv;
        }
    }

    public static class TaggedWritable extends TaggedMapOutput {

        private Writable data;
        public TaggedWritable()
        {
            this.tag = new Text();

        }//end empty( taking no parameters) constructor TaggedWritable

        public TaggedWritable(Writable data) {

            this.tag = new Text("");
            this.data = data;
        }

        public Writable getData() {
            return data;
        }

        public void write(DataOutput out) throws IOException {
            //System.out.println(");
            this.tag.write(out);
            this.data.write(out);
            System.out.println("Tag  :"+tag+" Data  :"+ data);
        }

        /*
        public void readFields(DataInput in) throws IOException {
            System.out.println(" Starting short readFields(): "+ in);
            this.tag.readFields(in);
            this.data.readFields(in);
        } */
        public void readFields(DataInput in) throws IOException {
            System.out.println(" Starting short readFields(): "+ in);
            this.tag.readFields(in);
            String w = in.toString();
            if(this.data == null)
                try {
                    this.data =(Writable)
ReflectionUtils.newInstance(Class.forName(w), null);
                } catch (ClassNotFoundException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            this.data.readFields(in);
        }
    }
    public int run(String[] args) throws Exception {
        System.out.println("Starting run() Method:");
        Configuration conf = getConf();
        conf.addResource(new
Path("/home/vikas/project/hadoop-1.0.3/conf/core-site.xml"));
        conf.addResource(new
Path("/home/vikas/project/hadoop-1.0.3/conf/mapred-site.xml"));
        conf.addResource(new
Path("/home/vikas/project/hadoop-1.0.3/conf/hdfs-site.xml"));
        JobConf job = new JobConf(conf, MyJoin.class);
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);

        FileInputFormat.setInputPaths(job, in);

        FileOutputFormat.setOutputPath(job, out);

        job.setJobName("DataJoin_cust X order");

        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(TaggedWritable.class);
        job.set("mapred.textoutputformat.separator", ",");
        JobClient.runJob(job);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        System.out.println("Starting main() function:");
        int res = ToolRunner.run(new Configuration(),
                                 new MyJoin(),
                                 args);

        System.exit(res);
    }
}

*************************and
error*********************************************
13/01/31 23:04:26 INFO util.NativeCodeLoader: Loaded the native-hadoop
library
13/01/31 23:04:26 WARN snappy.LoadSnappy: Snappy native library not loaded
13/01/31 23:04:26 INFO mapred.FileInputFormat: Total input paths to process
13/01/31 23:04:26 INFO mapred.JobClient: Running job: job_201301312254_0004
13/01/31 23:04:27 INFO mapred.JobClient:  map 0% reduce 0%
13/01/31 23:04:41 INFO mapred.JobClient:  map 66% reduce 0%
13/01/31 23:04:47 INFO mapred.JobClient:  map 100% reduce 0%
13/01/31 23:04:50 INFO mapred.JobClient:  map 100% reduce 22%
13/01/31 23:04:58 INFO mapred.JobClient: Task Id :
attempt_201301312254_0004_r_000000_0, Status : FAILED
java.lang.NullPointerException
    at MyJoin$TaggedWritable.readFields(MyJoin.java:125)
    at
org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67)
    at
org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
    at
org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1271)
    at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1211)
    at
org.apache.hadoop.mapred.Reduc