Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
HBase, mail # user - Map Reduce with multiple scans


Copy link to this message
-
Re: Map Reduce with multiple scans
Enis Söztutar 2013-02-28, 02:57
There is a

MultiTableInputFormat that has been recently added to HBase. You might want
to take a look at it.

https://github.com/apache/hbase/blob/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java

Enis
On Wed, Feb 27, 2013 at 8:17 AM, Paul van Hoven <
[EMAIL PROTECTED]> wrote:

> Thanks for your answers. I ended up by extending
> org.apache.hadoop.hbase.mapreduce.TableInputFormat and overwriting the
> split method and passing it to the map reduce job the following way:
>
> public class TimeRangeTableInputFormat extends TableInputFormat {
>
>         @Override
>         public List<InputSplit> getSplits( JobContext context ) throws
> IOException
>         {
>                 try {
>                         List<InputSplit> splits = new
> ArrayList<InputSplit>();
>                         Scan scan = getScan();
>
>                         //startrow and endrow must be a string as bytes in
> the format 2013-01-28
>                         byte startRow[] = scan.getStartRow();
>                         byte stopRow[] = scan.getStopRow();
>
>                         //For each date in the span, we are going to
> create a new scan object
>                         SimpleDateFormat dateFormatter = new
> SimpleDateFormat("yyyy-MM-dd");
>                         Date startDate = dateFormatter.parse(
> Bytes.toString( startRow ) );
>                         Date endDate = dateFormatter.parse(
> Bytes.toString( stopRow ) );
>
>                         for( Date iterDate = startDate;
> iterDate.compareTo(endDate) <= 0;
> iterDate = Utils.addDays( iterDate, 1 ) ) {
>
>                                 //since the dates in the row keys are
> stored using md5
>                                 byte[] md5Key = Utils.md5(
> dateFormatter.format(iterDate) );
>                                 int md5Length = 16;
>                                 int longLength = 8;
>
>                                 byte[] subStartRow = Bytes.padTail(
> md5Key, longLength ); //append
> "0 0 0 0 0 0 0 0"
>                                 byte[] subEndRow   = Bytes.padTail(
> md5Key, longLength );
>                                 subEndRow[md5Length-1]++; //last byte gets
> counted up
>
>                                 scan.setStartRow(subStartRow);
>                                 scan.setStopRow(subEndRow);
>                                 setScan(scan);
>
>                                 for (InputSplit subSplit :
> super.getSplits(context))
>                                         splits.add((InputSplit)
> ReflectionUtils.copy( context.getConfiguration(),
>
>     (TableSplit) subSplit, new TableSplit() ) );
>                         }
>
>                         return splits;
>
>                 } catch( Exception e ) {
>                         e.printStackTrace();
>                         return null;
>                 }
>         }
>
> }
>
> This way I get a new scan object for every day. And although I'm using
> md5 keys as a prefix in my rowkeys I can still scan ranges this way.
>
> Some questions remain:
> 1. What is your opinion about this approach?
> 2. @Nick: I've read somewhere that a filter list would be less
> efficient that overwriting the split method. What do you think?
>
>
> 2013/2/26 Nick Dimiduk <[EMAIL PROTECTED]>:
> > Hi Paul,
> >
> > You want to run multiple scans so that you can filter the previous scan
> > results? Am I correct in my understanding of your objective?
> >
> > First, I suggest you use the PrefixFilter [0] instead of constructing the
> > rowkey prefix manually. This looks something like:
> >
> > byte[] md5Key = Utils.md5( "2013-01-07" );
> > Scan scan = new Scan(md5Key);
> > scan.setFilter(new PrefixFilter(md5Key));
> >
> > Yes, that's a bit redundant, but setting the startkey explicitly will
> save
> > you some unnecessary processing.
> >
> > This map reduce job works fine but this is just one scan job for this map
> >> reduce task. What do I have to do to pass multiple scans?