Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
HBase >> mail # user >> MapReduce job with mixed data sources: HBase table and HDFS files


Copy link to this message
-
Re: MapReduce job with mixed data sources: HBase table and HDFS files
TextInputFormat wouldn't work:

public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

Take a look at TableInputFormatBase or the class(es) which extend it:
public abstract class TableInputFormatBase
implements InputFormat<ImmutableBytesWritable, Result> {

Cheers

On Thu, Jul 11, 2013 at 3:44 PM, S. Zhou <[EMAIL PROTECTED]> wrote:

> Thanks very much for the help, Ted & Azurry. I wrote a very simple MR
> program which takes HBase table as input and outputs to a HDFS file.
> Unfortunately, I run into the following error:
>
> java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be
> cast to org.apache.hadoop.hbase.io.ImmutableBytesWritable
>
> I run on pseudo-distributed hadoop (1.2.0) and Pseudo-distributed HBase
> (0.95.1-hadoop1).
>
> Here is the complete source code: an interesting thing is: if I comment
> out the multipleinputs line "MultipleInputs.addInputPath(job, inputPath1,
> TextInputFormat.class, TableMap.class);", the MR job runs fine.
>
> public class MixMR {
>
>     public static class TableMap extends TableMapper<Text, Text>  {
>         public static final byte[] CF = "cf".getBytes();
>         public static final byte[] ATTR1 = "c1".getBytes();
>
>         public void map(ImmutableBytesWritable row, Result value, Context
> context) throws IOException, InterruptedException {
>
>             String key = Bytes.toString(row.get());
>             String val = new String(value.getValue(CF, ATTR1));
>
>             context.write(new Text(key), new Text(val));
>         }
>     }
>
>
>     public static class Reduce extends Reducer  <Object, Text, Object,
> Text> {
>         public void reduce(Object key, Iterable<Text> values, Context
> context)
>                 throws IOException, InterruptedException {
>             String ks = key.toString();
>             for (Text val : values){
>                 context.write(new Text(ks), val);
>             }
>
>         }
>     }
>
>  public static void main(String[] args) throws Exception {
>         Path inputPath1 = new Path(args[0]);
>         Path outputPath = new Path(args[1]);
>
>         String tableName1 = "test";
>
>         Configuration config = HBaseConfiguration.create();
>         Job job = new Job(config, "ExampleRead");
>         job.setJarByClass(MixMR.class);     // class that contains mapper
>
>
>         Scan scan = new Scan();
>         scan.setCaching(500);        // 1 is the default in Scan, which
> will be bad for MapReduce jobs
>         scan.setCacheBlocks(false);  // don't set to true for MR jobs
>         scan.addFamily(Bytes.toBytes("cf"));
>
>         TableMapReduceUtil.initTableMapperJob(
>                 tableName1,        // input HBase table name
>                   scan,             // Scan instance to control CF and
> attribute selection
>                   TableMap.class,   // mapper
>                   Text.class,             // mapper output key
>                   Text.class,             // mapper output value
>                   job);
>         job.setReducerClass(Reduce.class);    // reducer class
>         job.setOutputFormatClass(TextOutputFormat.class);
>
>         // inputPath1 here has no effect for HBase table
>         MultipleInputs.addInputPath(job, inputPath1,
> TextInputFormat.class, TableMap.class);
>
>         FileOutputFormat.setOutputPath(job, outputPath);
>
>         job.waitForCompletion(true);
>     }
> }
>
>
>
>
>
> ________________________________
>  From: Ted Yu <[EMAIL PROTECTED]>
> To: [EMAIL PROTECTED]; S. Zhou <[EMAIL PROTECTED]>
> Sent: Wednesday, July 10, 2013 11:21 AM
> Subject: Re: MapReduce job with mixed data sources: HBase table and HDFS
> files
>
>
>     conf.set(TableInputFormat.SCAN, convertScanToString(scan));
>
> is called by initTableMapperJob().
>
> Looking at the source would make it clear for you.
>
> Cheers
>
> On Wed, Jul 10, 2013 at 10:55 AM, S. Zhou <[EMAIL PROTECTED]> wrote:
>
> > Thanks Ted. I will try that. But at this time I am not sure how to call "
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB