Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce, mail # user - Need help with exception when mapper emits different key class from reducer


Copy link to this message
-
Need help with exception when mapper emits different key class from reducer
Steve Lewis 2010-06-18, 18:09
This class is a copy of a standard WordCount class with one critical
exception
Instead of the Mapper Emitting a Key of Type Text it emits a key of type
MyText - s simple subclass of Text
The reducer emits a different subclass of Text - YourText
I say
        job.setMapOutputKeyClass(MyText.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(YourText.class);
        job.setOutputValueClass(IntWritable.class);

which should declare these classes directly  and yet I get the following
exception using hadoop 0.2 on a local box

What am I doing wrong
java.io.IOException: wrong key class: class
org.systemsbiology.hadoop.CapitalWordCount$YourText is not class
org.systemsbiology.hadoop.CapitalWordCount$MyText
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164)
at
org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:880)
at
org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1201)
at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at
org.systemsbiology.hadoop.CapitalWordCount$IntSumReducer.reduce(CapitalWordCount.java:89)
package org.systemsbiology.hadoop;

import com.lordjoe.utilities.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;

import java.io.*;
import java.util.*;

/**
 *  org.systemsbiology.hadoop.CapitalWordCount
 */
public class CapitalWordCount {

    public static class YourText extends Text
      {
          public YourText() {
          }

          /**
           * Construct from a string.
           */
          public YourText(final String string) {
              super(string);
          }
      }
    public static class MyText extends Text
    {
        public MyText() {
        }

        /**
         * Construct from a string.
         */
        public MyText(final String string) {
            super(string);
        }
    }
    public static class TokenizerMapper
            extends Mapper<Object, Text, MyText, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private MyText word = new MyText();

        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                String s = itr.nextToken().toUpperCase();
                s = dropNonLetters(s);
                if (s.length() > 0) {
                    word.set(s);
                    context.write(word, one);
                }
            }
        }
    }

    public static String dropNonLetters(String s) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < s.length(); i++) {
            char c = s.charAt(i);
            if (Character.isLetter(c))
                sb.append(c);
        }

        return sb.toString();
    }

    public static class IntSumReducer
            extends Reducer<MyText, IntWritable, YourText, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(MyText key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(new YourText(key.toString()), result);
        }
    }

    public static class MyPartitioner extends Partitioner<Text, IntWritable>
{
        /**
         * Get the partition number for a given key (hence record) given the
total
         * number of partitions i.e. number of reduce-tasks for the job.
         * <p/>
         * <p>Typically a hash function on a all or a subset of the key.</p>
         *
         * @param key           the key to be partioned.
         * @param value         the entry value.
         * @param numPartitions the total number of partitions.
         * @return the partition number for the <code>key</code>.
         */
        @Override
        public int getPartition(Text key, IntWritable value, int
numPartitions) {

            String s = key.toString();
            if (s.length() == 0)
                return 0;
            char c = s.charAt(0);
            int letter = Character.toUpperCase(c) - 'A';
            if (letter < 0 || letter > 26)
                return 0;
            return letter % numPartitions;
        }
    }
    /**
     * Force loading of needed classes to make
     */
    public static final Class[] NEEDED             {
                    org.apache.commons.logging.LogFactory.class,
                    org.apache.commons.cli.ParseException.class
            };
    public static final int DEFAULT_REDUCE_TASKS = 14;

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf,
args).getRemainingArgs();
//        if (otherArgs.length != 2) {
//            System.err.println("Usage: wordcount <in> <out>");
//            System.exit(2);
//        }
        Job job = new Job(conf, "word count");
        job.setJarByClass(CapitalWordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);

        job.setMapOutputKeyClass(MyText.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(YourText.class);
        job.setOutputValueClass(IntWritable.class);
        // added Slewis
        job.setNumReduceTasks(DEFAULT_REDUCE_TASKS);
        job.setPartitionerClass(MyPartitioner.class);

        if(otherArgs.length >