|
|
-
Re: Map Reduce with multiple scansEnis Söztutar 2013-02-28, 02:57
There is a
MultiTableInputFormat that has been recently added to HBase. You might want to take a look at it. https://github.com/apache/hbase/blob/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java Enis On Wed, Feb 27, 2013 at 8:17 AM, Paul van Hoven < [EMAIL PROTECTED]> wrote: > Thanks for your answers. I ended up by extending > org.apache.hadoop.hbase.mapreduce.TableInputFormat and overwriting the > split method and passing it to the map reduce job the following way: > > public class TimeRangeTableInputFormat extends TableInputFormat { > > @Override > public List<InputSplit> getSplits( JobContext context ) throws > IOException > { > try { > List<InputSplit> splits = new > ArrayList<InputSplit>(); > Scan scan = getScan(); > > //startrow and endrow must be a string as bytes in > the format 2013-01-28 > byte startRow[] = scan.getStartRow(); > byte stopRow[] = scan.getStopRow(); > > //For each date in the span, we are going to > create a new scan object > SimpleDateFormat dateFormatter = new > SimpleDateFormat("yyyy-MM-dd"); > Date startDate = dateFormatter.parse( > Bytes.toString( startRow ) ); > Date endDate = dateFormatter.parse( > Bytes.toString( stopRow ) ); > > for( Date iterDate = startDate; > iterDate.compareTo(endDate) <= 0; > iterDate = Utils.addDays( iterDate, 1 ) ) { > > //since the dates in the row keys are > stored using md5 > byte[] md5Key = Utils.md5( > dateFormatter.format(iterDate) ); > int md5Length = 16; > int longLength = 8; > > byte[] subStartRow = Bytes.padTail( > md5Key, longLength ); //append > "0 0 0 0 0 0 0 0" > byte[] subEndRow = Bytes.padTail( > md5Key, longLength ); > subEndRow[md5Length-1]++; //last byte gets > counted up > > scan.setStartRow(subStartRow); > scan.setStopRow(subEndRow); > setScan(scan); > > for (InputSplit subSplit : > super.getSplits(context)) > splits.add((InputSplit) > ReflectionUtils.copy( context.getConfiguration(), > > (TableSplit) subSplit, new TableSplit() ) ); > } > > return splits; > > } catch( Exception e ) { > e.printStackTrace(); > return null; > } > } > > } > > This way I get a new scan object for every day. And although I'm using > md5 keys as a prefix in my rowkeys I can still scan ranges this way. > > Some questions remain: > 1. What is your opinion about this approach? > 2. @Nick: I've read somewhere that a filter list would be less > efficient that overwriting the split method. What do you think? > > > 2013/2/26 Nick Dimiduk <[EMAIL PROTECTED]>: > > Hi Paul, > > > > You want to run multiple scans so that you can filter the previous scan > > results? Am I correct in my understanding of your objective? > > > > First, I suggest you use the PrefixFilter [0] instead of constructing the > > rowkey prefix manually. This looks something like: > > > > byte[] md5Key = Utils.md5( "2013-01-07" ); > > Scan scan = new Scan(md5Key); > > scan.setFilter(new PrefixFilter(md5Key)); > > > > Yes, that's a bit redundant, but setting the startkey explicitly will > save > > you some unnecessary processing. > > > > This map reduce job works fine but this is just one scan job for this map > >> reduce task. What do I have to do to pass multiple scans? |