Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Hadoop >> mail # dev >> Reduce side join - Hadoop default - error in combiner


Copy link to this message
-
RE: Reduce side join - Hadoop default - error in combiner
Hi Lopez,

   You need to have default constructor for TaggedWritable because while deserializing it creates instance using default constructor and calls the readFields() on it.
Thanks
Devaraj

________________________________________
From: LopezGG [[EMAIL PROTECTED]]
Sent: Wednesday, April 18, 2012 8:19 AM
To: [EMAIL PROTECTED]
Subject: Reduce side join - Hadoop default - error in combiner

I am using the folllowing code to do the reduce side join
    /*
     * HadoopMapper.java
     *
     * Created on Apr 8, 2012, 5:39:51 PM
     */
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    // import org.apache.commons.logging.Log;
    // import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.TextInputFormat;
    import org.apache.hadoop.mapred.TextOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.contrib.utils.join.*;

    /**
     *
     * @author
     */
    public class DataJoin extends Configured implements Tool
        {
            public static class MapClass extends DataJoinMapperBase
                {
                    protected Text generateInputTag(String inputFile)
                        {
                            String datasource = inputFile.split("-")[0];
                            return new Text(datasource);
                        }
                protected Text generateGroupKey(TaggedMapOutput aRecord)
                    {
                        String line = ((Text) aRecord.getData()).toString();
                        String[] tokens = line.split(",");
                        String groupKey = tokens[0];
                        return new Text(groupKey);
                    }
                protected TaggedMapOutput generateTaggedMapOutput(Object
value)
                    {
                        TaggedWritable retv = new TaggedWritable((Text)
value);
                        retv.setTag(this.inputTag);
                        return retv;
                    }
                }
            public static class Reduce extends DataJoinReducerBase
                {
                    protected TaggedMapOutput combine(Object[] tags,
Object[] values)
                        {
                            if (tags.length < 2) return null;
                            String joinedStr = "";
                            for (int i=0; i<values.length; i++)
                            {
                                if (i > 0) joinedStr += ",";
                                TaggedWritable tw = (TaggedWritable)
values[i];
                                String line = ((Text)
tw.getData()).toString();
                                String[] tokens = line.split(",", 2);
                                joinedStr += tokens[1];
                            }
                            TaggedWritable retv = new TaggedWritable(new
Text(joinedStr));
                            retv.setTag((Text) tags[0]);
                            return retv;
                        }
                }
            public static class TaggedWritable extends TaggedMapOutput
                {
                    private Writable data;
                    public TaggedWritable(Writable data)
                        {
                            this.tag = new Text("");
                            this.data = data;
                        }

                    public Writable getData()
                        {
                            return data;
                        }
                    public void write(DataOutput out) throws IOException
                        {
                            this.tag.write(out);
                            this.data.write(out);
                        }
                    public void readFields(DataInput in) throws IOException
                        {
                            this.tag.readFields(in);
                            this.data.readFields(in);
                        }
                }
            public int run(String[] args) throws Exception
                {
                                    Configuration conf = getConf();
                    JobConf job = new JobConf(conf, DataJoin.class);
                                    String[] otherArgs = new
GenericOptionsParser(conf, args).getRemainingArgs();
                                    if (otherArgs.length != 2)
                                    {
                                      System.err.println("Usage: wordcount
<in> <out>");
                                      System.exit(2);
                                    }

                    Path in = new Path(args[0]);
                    Path out = new Path(args[1]);
                    FileInputFormat.setInputPaths(job, in);
                    FileOutputFormat.setOutputPath(job, out);
                    job.setJobName("DataJoin");
                    job.setMapperClass(MapClass.class);
                    job.setReducerClass(Reduce.class);
                    job.setInputFormat(TextInputFormat.class);
                    job.setOutputFormat(TextOutputFormat.class);
                    job.setOutputKeyClass(Text.class);
                    job.setOutputValueClass(TaggedWritable.class);
                    job.set("mapred.textoutputformat.separator", ",");
                    JobClient.runJob(job);
                    return 0;
                }
            public static void main(Str
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB