|
|
-
Re: Map Reduce with multiple scansPaul van Hoven 2013-02-27, 16:17
Thanks for your answers. I ended up by extending
org.apache.hadoop.hbase.mapreduce.TableInputFormat and overwriting the split method and passing it to the map reduce job the following way: public class TimeRangeTableInputFormat extends TableInputFormat { @Override public List<InputSplit> getSplits( JobContext context ) throws IOException { try { List<InputSplit> splits = new ArrayList<InputSplit>(); Scan scan = getScan(); //startrow and endrow must be a string as bytes in the format 2013-01-28 byte startRow[] = scan.getStartRow(); byte stopRow[] = scan.getStopRow(); //For each date in the span, we are going to create a new scan object SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd"); Date startDate = dateFormatter.parse( Bytes.toString( startRow ) ); Date endDate = dateFormatter.parse( Bytes.toString( stopRow ) ); for( Date iterDate = startDate; iterDate.compareTo(endDate) <= 0; iterDate = Utils.addDays( iterDate, 1 ) ) { //since the dates in the row keys are stored using md5 byte[] md5Key = Utils.md5( dateFormatter.format(iterDate) ); int md5Length = 16; int longLength = 8; byte[] subStartRow = Bytes.padTail( md5Key, longLength ); //append "0 0 0 0 0 0 0 0" byte[] subEndRow = Bytes.padTail( md5Key, longLength ); subEndRow[md5Length-1]++; //last byte gets counted up scan.setStartRow(subStartRow); scan.setStopRow(subEndRow); setScan(scan); for (InputSplit subSplit : super.getSplits(context)) splits.add((InputSplit) ReflectionUtils.copy( context.getConfiguration(), (TableSplit) subSplit, new TableSplit() ) ); } return splits; } catch( Exception e ) { e.printStackTrace(); return null; } } } This way I get a new scan object for every day. And although I'm using md5 keys as a prefix in my rowkeys I can still scan ranges this way. Some questions remain: 1. What is your opinion about this approach? 2. @Nick: I've read somewhere that a filter list would be less efficient that overwriting the split method. What do you think? 2013/2/26 Nick Dimiduk <[EMAIL PROTECTED]>: > Hi Paul, > > You want to run multiple scans so that you can filter the previous scan > results? Am I correct in my understanding of your objective? > > First, I suggest you use the PrefixFilter [0] instead of constructing the > rowkey prefix manually. This looks something like: > > byte[] md5Key = Utils.md5( "2013-01-07" ); > Scan scan = new Scan(md5Key); > scan.setFilter(new PrefixFilter(md5Key)); > > Yes, that's a bit redundant, but setting the startkey explicitly will save > you some unnecessary processing. > > This map reduce job works fine but this is just one scan job for this map >> reduce task. What do I have to do to pass multiple scans? > > > Do you mean processing on multiple dates? In that case, what you really > want is a full (unbounded) table scan. Since date is the first part of your > compound rowkey, there's no prefix and no need for a filter, just use new > Scan(). > > In general, you can use multiple filters in a given Scan (or Get). See the > FilterList [1] for details. > > Does this help? > Nick > > [0]: > http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/PrefixFilter.html > [1]: > http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/FilterList.html > > On Tue, Feb 26, 2013 at 5:41 AM, Paul van Hoven < > [EMAIL PROTECTED]> wrote: > >> My rowkeys look something like this: >> >> md5( date ) + md5( ip address ) >> >> So an example would be >> md5( "2013-02-08") + md5( "192.168.187.2") >> >> For one particular date I got several rows. Now I'd like to query >> different dates, for example "2013-01-01" and "2013-02-01" and some >> other. Additionally I'd like to perform this or these scans in a map >> reduce job. >> >> Currently my map reduce job looks like this: >> >> Configuration config = HBaseConfiguration.create(); >> Job job = new Job(config,"ToyJob"); |