Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce >> mail # user >> Need help with exception when mapper emits different key class from reducer


Copy link to this message
-
Need help with exception when mapper emits different key class from reducer
This class is a copy of a standard WordCount class with one critical
exception
Instead of the Mapper Emitting a Key of Type Text it emits a key of type
MyText - s simple subclass of Text
The reducer emits a different subclass of Text - YourText
I say
        job.setMapOutputKeyClass(MyText.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(YourText.class);
        job.setOutputValueClass(IntWritable.class);

which should declare these classes directly  and yet I get the following
exception using hadoop 0.2 on a local box

What am I doing wrong
java.io.IOException: wrong key class: class
org.systemsbiology.hadoop.CapitalWordCount$YourText is not class
org.systemsbiology.hadoop.CapitalWordCount$MyText
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164)
at
org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:880)
at
org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1201)
at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at
org.systemsbiology.hadoop.CapitalWordCount$IntSumReducer.reduce(CapitalWordCount.java:89)
package org.systemsbiology.hadoop;

import com.lordjoe.utilities.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;

import java.io.*;
import java.util.*;

/**
 *  org.systemsbiology.hadoop.CapitalWordCount
 */
public class CapitalWordCount {

    public static class YourText extends Text
      {
          public YourText() {
          }

          /**
           * Construct from a string.
           */
          public YourText(final String string) {
              super(string);
          }
      }
    public static class MyText extends Text
    {
        public MyText() {
        }

        /**
         * Construct from a string.
         */
        public MyText(final String string) {
            super(string);
        }
    }
    public static class TokenizerMapper
            extends Mapper<Object, Text, MyText, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private MyText word = new MyText();

        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                String s = itr.nextToken().toUpperCase();
                s = dropNonLetters(s);
                if (s.length() > 0) {
                    word.set(s);
                    context.write(word, one);
                }
            }
        }
    }

    public static String dropNonLetters(String s) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < s.length(); i++) {
            char c = s.charAt(i);
            if (Character.isLetter(c))
                sb.append(c);
        }

        return sb.toString();
    }

    public static class IntSumReducer
            extends Reducer<MyText, IntWritable, YourText, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(MyText key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(new YourText(key.toString()), result);
        }
    }

    public static class MyPartitioner extends Partitioner<Text, IntWritable>
{
        /**
         * Get the partition number for a given key (hence record) given the
total
         * number of partitions i.e. number of reduce-tasks for the job.
         * <p/>
         * <p>Typically a hash function on a all or a subset of the key.</p>
         *
         * @param key           the key to be partioned.
         * @param value         the entry value.
         * @param numPartitions the total number of partitions.
         * @return the partition number for the <code>key</code>.
         */
        @Override
        public int getPartition(Text key, IntWritable value, int
numPartitions) {

            String s = key.toString();
            if (s.length() == 0)
                return 0;
            char c = s.charAt(0);
            int letter = Character.toUpperCase(c) - 'A';
            if (letter < 0 || letter > 26)
                return 0;
            return letter % numPartitions;
        }
    }
    /**
     * Force loading of needed classes to make
     */
    public static final Class[] NEEDED             {
                    org.apache.commons.logging.LogFactory.class,
                    org.apache.commons.cli.ParseException.class
            };
    public static final int DEFAULT_REDUCE_TASKS = 14;

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf,
args).getRemainingArgs();
//        if (otherArgs.length != 2) {
//            System.err.println("Usage: wordcount <in> <out>");
//            System.exit(2);
//        }
        Job job = new Job(conf, "word count");
        job.setJarByClass(CapitalWordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);

        job.setMapOutputKeyClass(MyText.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(YourText.class);
        job.setOutputValueClass(IntWritable.class);
        // added Slewis
        job.setNumReduceTasks(DEFAULT_REDUCE_TASKS);
        job.setPartitionerClass(MyPartitioner.class);

        if(otherArgs.length >
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB