Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
HBase >> mail # user >> HBase MR - key/value mismatch


Copy link to this message
-
HBase MR - key/value mismatch
I'm trying to execute a MR code over stand-alone HBase(0.94.11). I had read the HBase api and modified my MR code to read data and getting exceptions in the Reduce phase.

The exception I get is :

13/09/05 16:16:17 INFO mapred.JobClient:  map 0% reduce 0%

13/09/05 16:23:31 INFO mapred.JobClient: Task Id : attempt_201309051437_0005_m_000000_0, Status : FAILED

java.io.IOException: wrong key class: class org.apache.hadoop.hbase.io.ImmutableBytesWritable is not class org.apache.hadoop.io.Text

        at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164)

        at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1168)

        at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1492)

        at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)

        at com.hbase.mapreduce.SentimentCalculationHBaseReducer.reduce(SentimentCalculationHBaseReducer.java:199)

        at com.hbase.mapreduce.SentimentCalculationHBaseReducer.reduce(SentimentCalculationHBaseReducer.java:1)

        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)

        at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1513)

        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1436)

        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1298)

        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:699)

        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:766)

        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)

        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1149)

        at org.apache.hadoop.mapred.Child.main(Child.java:249)

Providing the partial(excluding the business logic) codes
Mapper:
public class SentimentCalculationHBaseMapper extends TableMapper<Text, Text> {

private Text sentenseOriginal = new Text();

private Text sentenseParsed = new Text();

@Override

    protected void map(

            ImmutableBytesWritable key,

            Result value,

            org.apache.hadoop.mapreduce.Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)

            throws IOException, InterruptedException {

context.write(this.sentenseOriginal, this.sentenseParsed);

}

}

Reducer :
public class SentimentCalculationHBaseReducer extends

        TableReducer<Text, Text, ImmutableBytesWritable> {

@Override

    protected void reduce(

            Text key,

            java.lang.Iterable<Text> values,

            org.apache.hadoop.mapreduce.Reducer<Text, Text, ImmutableBytesWritable, org.apache.hadoop.io.Writable>.Context context)

            throws IOException, InterruptedException {

Double mdblSentimentOverall = 0.0;

String d3 = key + "@12321@" + s11.replaceFirst(":::", "")

                    + "@12321@" + mstpositiveWords + "@12321@"

                    + mstnegativeWords + "@12321@" + mstneutralWords;

            System.out.println("d3 : " + d3 + " , mdblSentimentOverall : "

                    + mdblSentimentOverall);

            Put put = new Put(d3.getBytes());

            put.add(Bytes.toBytes("word_attributes"),

                    Bytes.toBytes("mdblSentimentOverall"),

                    Bytes.toBytes(mdblSentimentOverall));

            System.out.println("Context is " + context);

            context.write(new ImmutableBytesWritable(d3.getBytes()), put);

}

}

SentimentCalculatorHBase - the Tool/main class :
package com.hbase.mapreduce;

import java.util.Calendar;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SentimentCalculatorHBase extends Configured implements Tool {

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        SentimentCalculatorHBase sentimentCalculatorHBase = new SentimentCalculatorHBase();
        ToolRunner.run(sentimentCalculatorHBase, args);
    }

    @Override
    public int run(String[] arg0) throws Exception {
        // TODO Auto-generated method stub
        System.out
                .println("***********************Configuration started***********************");
        Configuration configuration = getConf();
        System.out.println("Conf: " + configuration);
        Job sentiCalcJob = new Job(configuration, "HBase SentimentCalculation");

        sentiCalcJob.setJarByClass(SentimentCalculatorHBase.class);
        sentiCalcJob.setMapperClass(SentimentCalculationHBaseMapper.class);
        sentiCalcJob.setCombinerClass(SentimentCalculationHBaseReducer.class);
        sentiCalcJob.setReducerClass(SentimentCalculationHBaseReducer.class);
        sentiCalcJob.setInputFormatClass(TableInputFormat.class);
        sentiCalcJob.setOutputFormatClass(TableOutputFormat.class);

        /* Start : Added out of exasperation! */
        sentiCalcJob.setOutputKeyClass(ImmutableBytesWritable.class);
        sentiCalcJob.setOutputValueClass(Put.class);
        /* End : Added out of exasperation! */

        Scan twitterdataUserScan = new Scan();
        twitterdataUserScan.setCaching(500);

        twitterdataUs