Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
HBase >> mail # user >> MapReduce job with mixed data sources: HBase table and HDFS files


+
S. Zhou 2013-07-03, 04:34
+
Azuryy Yu 2013-07-03, 05:06
+
S. Zhou 2013-07-03, 15:34
+
Michael Segel 2013-07-03, 21:19
+
Azuryy Yu 2013-07-04, 01:02
+
Ted Yu 2013-07-04, 04:29
+
S. Zhou 2013-07-04, 03:41
+
Michael Segel 2013-07-05, 21:06
+
S. Zhou 2013-07-10, 17:15
+
Ted Yu 2013-07-10, 17:21
+
S. Zhou 2013-07-10, 17:55
+
Ted Yu 2013-07-10, 18:21
+
S. Zhou 2013-07-11, 22:44
+
Ted Yu 2013-07-11, 22:51
+
S. Zhou 2013-07-12, 04:49
Copy link to this message
-
Re: MapReduce job with mixed data sources: HBase table and HDFS files
Did you use org.apache.hadoop.mapreduce.lib.input.MultipleInputs or the one
from org.apache.hadoop.mapred.lib ?

Which hadoop version do you use ?

Cheers

On Thu, Jul 11, 2013 at 9:49 PM, S. Zhou <[EMAIL PROTECTED]> wrote:

> Thanks Ted &Azurry. Your hint helped me solve that particular issue.
>
> But now I run into a new problem with multipleInputs. This time I add a
> HTable and a HDFS file as inputs. (see the new code below). The problem is:
> whatever data source added later overrides the data source added before.
> For example, if I add HBase table first to MultipleInputs first then HDFS
> file, then the final result from reducer only contains the results of HDFS
> file. On the other hand, if I add HDFS file first then HBase table, then
> the final result from reducer only contains the results of HBase table.
>
> public class MixMR {
>        public static class Map extends Mapper<Object, Text, Text, Text> {
>
>                 public void map(Object key, Text value, Context context)
> throws IOException,   InterruptedException {
>                         String s = value.toString();
>                         String[] sa = s.split(",");
>                         if (sa.length == 2) {
>                                 context.write(new Text(sa[0]), new
> Text(sa[1]));
>
>                         }
>
>                 }
>
>         }
>
>     public static class TableMap extends TableMapper<Text, Text>  {
>         public static final byte[] CF = "cf".getBytes();
>         public static final byte[] ATTR1 = "c1".getBytes();
>
>         public void map(ImmutableBytesWritable row, Result value, Context
> context) throws IOException, InterruptedException {
>
>             String key = Bytes.toString(row.get());
>             String val = new String(value.getValue(CF, ATTR1));
>
>             context.write(new Text(key), new Text(val));
>         }
>     }
>
>
>     public static class Reduce extends Reducer  <Object, Text, Object,
> Text> {
>         public void reduce(Object key, Iterable<Text> values, Context
> context)
>                 throws IOException, InterruptedException {
>             String ks = key.toString();
>             for (Text val : values){
>                 context.write(new Text(ks), val);
>             }
>
>         }
>     }
>
>  public static void main(String[] args) throws Exception {
>                   Path inputPath1 = new Path(args[0]);
>                 Path outputPath = new Path(args[1]);
>
>                 String tableName1 = "sezhou-test";
>
>                 Configuration config1 = HBaseConfiguration.create();
>                 Job job1 = new Job(config1, "ExampleRead");
>
> job1.setJarByClass(com.shopping.test.dealstore.MixMR.class);     // class
> that contains mapper
>
>
>                 Scan scan1 = new Scan();
>                 scan1.setCaching(500);        // 1 is the default in Scan,
> which will be bad for MapReduce jobs
>                 scan1.setCacheBlocks(false);  // don't set to true for MR
> jobs
>                 scan1.addFamily(Bytes.toBytes("cf"));
>
>
>                 TableMapReduceUtil.initTableMapperJob(
>                                 tableName1,        // input HBase table
> name
>                                   scan1,             // Scan instance to
> control CF and attribute selection
>
>                                   TableMap.class,   // mapper
>                                   Text.class,             // mapper output
> key
>                                   Text.class,             // mapper output
> value
>                                   job1);
>
>
>                 job1.setReducerClass(Reduce.class);    // reducer class
>                 job1.setOutputFormatClass(TextOutputFormat.class);
>
>                 // the result from reducer only contains the HBase table
>                 MultipleInputs.addInputPath(job1, inputPath1,
> TextInputFormat.class, Map.class);
>                 MultipleInputs.addInputPath(job1, inputPath1,
> TableInputFormat.class, TableMap.class); // inputPath1 here has no effect
+
S. Zhou 2013-07-12, 05:19
+
S. Zhou 2013-07-12, 15:49
+
S. Zhou 2013-07-03, 15:18
+
Ted Yu 2013-07-03, 04:57
+
S. Zhou 2013-07-03, 15:17