Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
HDFS >> mail # user >> Fwd: Issue with Reduce Side join using datajoin package


Copy link to this message
-
Fwd: Issue with Reduce Side join using datajoin package
---------- Forwarded message ----------
From: Vikas Jadhav <[EMAIL PROTECTED]>
Date: Thu, Jan 31, 2013 at 11:14 PM
Subject: Re: Issue with Reduce Side join using datajoin package
To: [EMAIL PROTECTED]
***************source ****************

public class MyJoin extends Configured implements Tool {

    public static class MapClass extends DataJoinMapperBase {

        protected Text generateInputTag(String inputFile) {
            System.out.println("Starting generateInputTag() : "+inputFile);
            String datasource = inputFile.split("-")[0];
            return new Text(datasource);
        }

        protected Text generateGroupKey(TaggedMapOutput aRecord) {
            System.out.println(" Statring generateGroupKey() : "+aRecord);
            String line = ((Text) aRecord.getData()).toString();
            String[] tokens = line.split(",");
            String groupKey = tokens[0];
            return new Text(groupKey);
        }

        protected TaggedMapOutput generateTaggedMapOutput(Object value) {
            System.out.println("starting  generateTaggedMapOutput() value
: "+value);
            TaggedWritable retv = new TaggedWritable((Text) value);
            retv.setTag(this.inputTag);
            return retv;
        }
    }

    public static class Reduce extends DataJoinReducerBase {

        protected TaggedMapOutput combine(Object[] tags, Object[] values) {
             System.out.println("combine :");
            if (tags.length < 2) return null;
            String joinedStr = "";
            for (int i=0; i<values.length; i++) {
                if (i > 0) joinedStr += ",";
                TaggedWritable tw = (TaggedWritable) values[i];
                String line = ((Text) tw.getData()).toString();
                String[] tokens = line.split(",", 2);
                joinedStr += tokens[1];
            }
            TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
            retv.setTag((Text) tags[0]);
            return retv;
        }
    }

    public static class TaggedWritable extends TaggedMapOutput {

        private Writable data;
        public TaggedWritable()
        {
            this.tag = new Text();

        }//end empty( taking no parameters) constructor TaggedWritable

        public TaggedWritable(Writable data) {

            this.tag = new Text("");
            this.data = data;
        }

        public Writable getData() {
            return data;
        }

        public void write(DataOutput out) throws IOException {
            //System.out.println(");
            this.tag.write(out);
            this.data.write(out);
            System.out.println("Tag  :"+tag+" Data  :"+ data);
        }

        /*
        public void readFields(DataInput in) throws IOException {
            System.out.println(" Starting short readFields(): "+ in);
            this.tag.readFields(in);
            this.data.readFields(in);
        } */
        public void readFields(DataInput in) throws IOException {
            System.out.println(" Starting short readFields(): "+ in);
            this.tag.readFields(in);
            String w = in.toString();
            if(this.data == null)
                try {
                    this.data =(Writable)
ReflectionUtils.newInstance(Class.forName(w), null);
                } catch (ClassNotFoundException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            this.data.readFields(in);
        }
    }
    public int run(String[] args) throws Exception {
        System.out.println("Starting run() Method:");
        Configuration conf = getConf();
        conf.addResource(new
Path("/home/vikas/project/hadoop-1.0.3/conf/core-site.xml"));
        conf.addResource(new
Path("/home/vikas/project/hadoop-1.0.3/conf/mapred-site.xml"));
        conf.addResource(new
Path("/home/vikas/project/hadoop-1.0.3/conf/hdfs-site.xml"));
        JobConf job = new JobConf(conf, MyJoin.class);
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);

        FileInputFormat.setInputPaths(job, in);

        FileOutputFormat.setOutputPath(job, out);

        job.setJobName("DataJoin_cust X order");

        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(TaggedWritable.class);
        job.set("mapred.textoutputformat.separator", ",");
        JobClient.runJob(job);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        System.out.println("Starting main() function:");
        int res = ToolRunner.run(new Configuration(),
                                 new MyJoin(),
                                 args);

        System.exit(res);
    }
}

*************************and
error*********************************************
13/01/31 23:04:26 INFO util.NativeCodeLoader: Loaded the native-hadoop
library
13/01/31 23:04:26 WARN snappy.LoadSnappy: Snappy native library not loaded
13/01/31 23:04:26 INFO mapred.FileInputFormat: Total input paths to process
13/01/31 23:04:26 INFO mapred.JobClient: Running job: job_201301312254_0004
13/01/31 23:04:27 INFO mapred.JobClient:  map 0% reduce 0%
13/01/31 23:04:41 INFO mapred.JobClient:  map 66% reduce 0%
13/01/31 23:04:47 INFO mapred.JobClient:  map 100% reduce 0%
13/01/31 23:04:50 INFO mapred.JobClient:  map 100% reduce 22%
13/01/31 23:04:58 INFO mapred.JobClient: Task Id :
attempt_201301312254_0004_r_000000_0, Status : FAILED
java.lang.NullPointerException
    at MyJoin$TaggedWritable.readFields(MyJoin.java:125)

    at
org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67)
    at
org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(Writabl
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB