|
|
-
why reduce doesn't work by HBase?yonghu 2012-11-15, 19:42
Dear all,
I use HBase as data source and hdfs as data sink. I wrote the program which calculate the versions for each cell as follows: public class ReturnMore { //only Map side has no problem, has already returned 3 versions public static class Map extends TableMapper<Text, Text>{ private Text outkey = new Text(); private Text outval = new Text(); public void map(ImmutableBytesWritable key, Result values, Context context){ KeyValue[] kvs = values.raw(); String row_key; String col_fam; String col; String val; String finalKey; String finalValue; for(KeyValue kv : kvs){ row_key = Bytes.toString(kv.getRow()); col_fam = Bytes.toString(kv.getFamily()); col = Bytes.toString(kv.getQualifier()); val = Bytes.toString(kv.getValue()); finalKey = row_key + "/" + col_fam + "/" + col; finalValue = new Long(kv.getTimestamp()).toString() + "/" + val; outkey.set(finalKey); outval.set(finalValue); try { context.write(outkey, outval); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static class Reduce extends Reducer<Text, Text, Text, IntWritable>{ //the desired output is key + 3 private IntWritable occ = new IntWritable(); public void reduce(Text key, Iterator<Text> values, Context context){ int i = 0; while(values.hasNext()){ i++; } occ.set(i); try { context.write(key, occ); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception{ Configuration conf = HBaseConfiguration.create(); Job job = new Job(conf); job.setJarByClass(ReturnMore.class); Scan scan = new Scan(); scan.setMaxVersions(); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setNumReduceTasks(1); TableMapReduceUtil.initTableMapperJob("test", scan, Map.class, Text.class, Text.class, job); FileSystem file = FileSystem.get(conf); Path path = new Path("/hans/test/"); if(file.exists(path)){ file.delete(path,true); } FileOutputFormat.setOutputPath(job, path); System.exit(job.waitForCompletion(true)?0:1); } } I tested this code in both hbase 0.92.1 and 0.94. However, if I run this code, it always outputs the content for each cell not as the output as I defined in reduce function (key + occurrence for each cell). Can anyone give me advices? By the way, I run it on pseudo-mode. regards! Yong +
yonghu 2012-11-16, 12:53
|