Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
HBase >> mail # user >> why reduce doesn't work by HBase?


Copy link to this message
-
why reduce doesn't work by HBase?
Dear all,

I use HBase as data source and hdfs as data sink. I wrote the program
which calculate the versions for each cell as follows:

public class ReturnMore { //only Map side has no problem, has already
returned 3 versions

   public static class Map extends TableMapper<Text, Text>{
      private Text outkey = new Text();
      private Text outval = new Text();
      public void map(ImmutableBytesWritable key, Result values, Context
context){
         KeyValue[] kvs = values.raw();
         String row_key;
         String col_fam;
         String col;
         String val;
         String finalKey;
         String finalValue;
         for(KeyValue kv : kvs){
            row_key = Bytes.toString(kv.getRow());
            col_fam = Bytes.toString(kv.getFamily());
            col = Bytes.toString(kv.getQualifier());
            val = Bytes.toString(kv.getValue());
            finalKey = row_key + "/" + col_fam + "/" + col;
            finalValue = new Long(kv.getTimestamp()).toString() + "/" +
val;
            outkey.set(finalKey);
            outval.set(finalValue);
            try {
               context.write(outkey, outval);
            } catch (IOException e) {
               e.printStackTrace();
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
      }
   }

   public static class Reduce extends Reducer<Text, Text, Text,
IntWritable>{ //the desired output is key + 3
      private IntWritable occ = new IntWritable();
      public void reduce(Text key, Iterator<Text> values, Context
context){
         int i = 0;
         while(values.hasNext()){
            i++;
         }
         occ.set(i);
         try {
            context.write(key, occ);
         } catch (IOException e) {
            e.printStackTrace();
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
   public static void main(String[] args) throws Exception{
      Configuration conf = HBaseConfiguration.create();
      Job job = new Job(conf);
      job.setJarByClass(ReturnMore.class);
      Scan scan = new Scan();
      scan.setMaxVersions();
      job.setReducerClass(Reduce.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      job.setNumReduceTasks(1);
      TableMapReduceUtil.initTableMapperJob("test", scan, Map.class,
Text.class, Text.class, job);
      FileSystem file = FileSystem.get(conf);
      Path path = new Path("/hans/test/");
      if(file.exists(path)){
         file.delete(path,true);
      }
      FileOutputFormat.setOutputPath(job, path);
      System.exit(job.waitForCompletion(true)?0:1);
   }
}

I tested this code in both hbase 0.92.1 and 0.94. However, if I run
this code, it always outputs the content for each cell not as the
output as I defined in reduce function (key + occurrence for each
cell). Can anyone give me advices? By the way, I run it on
pseudo-mode.

regards!

Yong
+
yonghu 2012-11-16, 12:53