Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
HBase >> mail # user >> Map Reduce with multiple scans


Copy link to this message
-
Re: Map Reduce with multiple scans
Thanks for your answers. I ended up by extending
org.apache.hadoop.hbase.mapreduce.TableInputFormat and overwriting the
split method and passing it to the map reduce job the following way:

public class TimeRangeTableInputFormat extends TableInputFormat {

@Override
public List<InputSplit> getSplits( JobContext context ) throws IOException
{
try {
List<InputSplit> splits = new ArrayList<InputSplit>();
Scan scan = getScan();

//startrow and endrow must be a string as bytes in the format 2013-01-28
byte startRow[] = scan.getStartRow();
byte stopRow[] = scan.getStopRow();

//For each date in the span, we are going to create a new scan object
SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd");
Date startDate = dateFormatter.parse( Bytes.toString( startRow ) );
Date endDate = dateFormatter.parse( Bytes.toString( stopRow ) );

for( Date iterDate = startDate; iterDate.compareTo(endDate) <= 0;
iterDate = Utils.addDays( iterDate, 1 ) ) {

//since the dates in the row keys are stored using md5
byte[] md5Key = Utils.md5( dateFormatter.format(iterDate) );
int md5Length = 16;
int longLength = 8;

byte[] subStartRow = Bytes.padTail( md5Key, longLength ); //append
"0 0 0 0 0 0 0 0"
byte[] subEndRow   = Bytes.padTail( md5Key, longLength );
subEndRow[md5Length-1]++; //last byte gets counted up

scan.setStartRow(subStartRow);
scan.setStopRow(subEndRow);
setScan(scan);

for (InputSplit subSplit : super.getSplits(context))
splits.add((InputSplit) ReflectionUtils.copy( context.getConfiguration(),
                     (TableSplit) subSplit, new TableSplit() ) );
}

return splits;

} catch( Exception e ) {
e.printStackTrace();
return null;
}
}

}

This way I get a new scan object for every day. And although I'm using
md5 keys as a prefix in my rowkeys I can still scan ranges this way.

Some questions remain:
1. What is your opinion about this approach?
2. @Nick: I've read somewhere that a filter list would be less
efficient that overwriting the split method. What do you think?
2013/2/26 Nick Dimiduk <[EMAIL PROTECTED]>:
> Hi Paul,
>
> You want to run multiple scans so that you can filter the previous scan
> results? Am I correct in my understanding of your objective?
>
> First, I suggest you use the PrefixFilter [0] instead of constructing the
> rowkey prefix manually. This looks something like:
>
> byte[] md5Key = Utils.md5( "2013-01-07" );
> Scan scan = new Scan(md5Key);
> scan.setFilter(new PrefixFilter(md5Key));
>
> Yes, that's a bit redundant, but setting the startkey explicitly will save
> you some unnecessary processing.
>
> This map reduce job works fine but this is just one scan job for this map
>> reduce task. What do I have to do to pass multiple scans?
>
>
> Do you mean processing on multiple dates? In that case, what you really
> want is a full (unbounded) table scan. Since date is the first part of your
> compound rowkey, there's no prefix and no need for a filter, just use new
> Scan().
>
> In general, you can use multiple filters in a given Scan (or Get). See the
> FilterList [1] for details.
>
> Does this help?
> Nick
>
> [0]:
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/PrefixFilter.html
> [1]:
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/FilterList.html
>
> On Tue, Feb 26, 2013 at 5:41 AM, Paul van Hoven <
> [EMAIL PROTECTED]> wrote:
>
>> My rowkeys look something like this:
>>
>> md5( date ) + md5( ip address )
>>
>> So an example would be
>> md5( "2013-02-08") + md5( "192.168.187.2")
>>
>> For one particular date I got several rows. Now I'd like to query
>> different dates, for example "2013-01-01" and "2013-02-01" and some
>> other. Additionally I'd like to perform this or these scans in a map
>> reduce job.
>>
>> Currently my map reduce job looks like this:
>>
>> Configuration config = HBaseConfiguration.create();
>> Job job = new Job(config,"ToyJob");
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB