|
Paul van Hoven
2013-02-26, 13:41
Stack
2013-02-26, 20:12
Nick Dimiduk
2013-02-26, 20:12
Paul van Hoven
2013-02-27, 16:17
Enis Söztutar
2013-02-28, 02:57
Asaf Mesika
2013-03-01, 14:17
|
-
Map Reduce with multiple scansPaul van Hoven 2013-02-26, 13:41
My rowkeys look something like this:
md5( date ) + md5( ip address ) So an example would be md5( "2013-02-08") + md5( "192.168.187.2") For one particular date I got several rows. Now I'd like to query different dates, for example "2013-01-01" and "2013-02-01" and some other. Additionally I'd like to perform this or these scans in a map reduce job. Currently my map reduce job looks like this: Configuration config = HBaseConfiguration.create(); Job job = new Job(config,"ToyJob"); job.setJarByClass( PlayWithMapReduce.class ); byte[] md5Key = Utils.md5( "2013-01-07" ); int md5Length = 16; int longLength = 8; byte[] startRow = Bytes.padTail( md5Key, longLength ); //append "0 0 0 0 0 0 0 0" byte[] endRow = Bytes.padTail( md5Key, longLength ); endRow[md5Length-1]++; //last byte gets counted up Scan scan = new Scan( startRow, endRow ); scan.setCaching(500); scan.setCacheBlocks(false); Filter f = new SingleColumnValueFilter( Bytes.toBytes("CF"), Bytes.toBytes("creativeId"), CompareOp.EQUAL, Bytes.toBytes("100") ); scan.setFilter(f); String tableName = "ToyDataTable"; TableMapReduceUtil.initTableMapperJob( tableName, scan, Mapper.class, null, null, job); This map reduce job works fine but this is just one scan job for this map reduce task. What do I have to do to pass multiple scans? Or do you have any other suggestions on how to achieve that goal? The constraint would be that it must be possible to combine it with map reduce.
-
Re: Map Reduce with multiple scansStack 2013-02-26, 20:12
On Tue, Feb 26, 2013 at 5:41 AM, Paul van Hoven <
[EMAIL PROTECTED]> wrote: > This map reduce job works fine but this is just one scan job for this > map reduce task. What do I have to do to pass multiple scans? Or do > you have any other suggestions on how to achieve that goal? The > constraint would be that it must be possible to combine it with map > reduce. > Not sure I follow. You want to run multiple queries in the one mapreduce job? Can you have a Scan per map task? Have an input that details each query you want to do and use that as mapreduce input (line per map task?). St.Ack
-
Re: Map Reduce with multiple scansNick Dimiduk 2013-02-26, 20:12
Hi Paul,
You want to run multiple scans so that you can filter the previous scan results? Am I correct in my understanding of your objective? First, I suggest you use the PrefixFilter [0] instead of constructing the rowkey prefix manually. This looks something like: byte[] md5Key = Utils.md5( "2013-01-07" ); Scan scan = new Scan(md5Key); scan.setFilter(new PrefixFilter(md5Key)); Yes, that's a bit redundant, but setting the startkey explicitly will save you some unnecessary processing. This map reduce job works fine but this is just one scan job for this map > reduce task. What do I have to do to pass multiple scans? Do you mean processing on multiple dates? In that case, what you really want is a full (unbounded) table scan. Since date is the first part of your compound rowkey, there's no prefix and no need for a filter, just use new Scan(). In general, you can use multiple filters in a given Scan (or Get). See the FilterList [1] for details. Does this help? Nick [0]: http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/PrefixFilter.html [1]: http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/FilterList.html On Tue, Feb 26, 2013 at 5:41 AM, Paul van Hoven < [EMAIL PROTECTED]> wrote: > My rowkeys look something like this: > > md5( date ) + md5( ip address ) > > So an example would be > md5( "2013-02-08") + md5( "192.168.187.2") > > For one particular date I got several rows. Now I'd like to query > different dates, for example "2013-01-01" and "2013-02-01" and some > other. Additionally I'd like to perform this or these scans in a map > reduce job. > > Currently my map reduce job looks like this: > > Configuration config = HBaseConfiguration.create(); > Job job = new Job(config,"ToyJob"); > job.setJarByClass( PlayWithMapReduce.class ); > > byte[] md5Key = Utils.md5( "2013-01-07" ); > int md5Length = 16; > int longLength = 8; > > byte[] startRow = Bytes.padTail( md5Key, longLength ); //append "0 0 0 > 0 0 0 0 0" > byte[] endRow = Bytes.padTail( md5Key, longLength ); > endRow[md5Length-1]++; //last byte gets counted up > > Scan scan = new Scan( startRow, endRow ); > scan.setCaching(500); > scan.setCacheBlocks(false); > > Filter f = new SingleColumnValueFilter( Bytes.toBytes("CF"), > Bytes.toBytes("creativeId"), CompareOp.EQUAL, Bytes.toBytes("100") ); > scan.setFilter(f); > > String tableName = "ToyDataTable"; > TableMapReduceUtil.initTableMapperJob( tableName, scan, Mapper.class, > null, null, job); > > This map reduce job works fine but this is just one scan job for this > map reduce task. What do I have to do to pass multiple scans? Or do > you have any other suggestions on how to achieve that goal? The > constraint would be that it must be possible to combine it with map > reduce. >
-
Re: Map Reduce with multiple scansPaul van Hoven 2013-02-27, 16:17
Thanks for your answers. I ended up by extending
org.apache.hadoop.hbase.mapreduce.TableInputFormat and overwriting the split method and passing it to the map reduce job the following way: public class TimeRangeTableInputFormat extends TableInputFormat { @Override public List<InputSplit> getSplits( JobContext context ) throws IOException { try { List<InputSplit> splits = new ArrayList<InputSplit>(); Scan scan = getScan(); //startrow and endrow must be a string as bytes in the format 2013-01-28 byte startRow[] = scan.getStartRow(); byte stopRow[] = scan.getStopRow(); //For each date in the span, we are going to create a new scan object SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd"); Date startDate = dateFormatter.parse( Bytes.toString( startRow ) ); Date endDate = dateFormatter.parse( Bytes.toString( stopRow ) ); for( Date iterDate = startDate; iterDate.compareTo(endDate) <= 0; iterDate = Utils.addDays( iterDate, 1 ) ) { //since the dates in the row keys are stored using md5 byte[] md5Key = Utils.md5( dateFormatter.format(iterDate) ); int md5Length = 16; int longLength = 8; byte[] subStartRow = Bytes.padTail( md5Key, longLength ); //append "0 0 0 0 0 0 0 0" byte[] subEndRow = Bytes.padTail( md5Key, longLength ); subEndRow[md5Length-1]++; //last byte gets counted up scan.setStartRow(subStartRow); scan.setStopRow(subEndRow); setScan(scan); for (InputSplit subSplit : super.getSplits(context)) splits.add((InputSplit) ReflectionUtils.copy( context.getConfiguration(), (TableSplit) subSplit, new TableSplit() ) ); } return splits; } catch( Exception e ) { e.printStackTrace(); return null; } } } This way I get a new scan object for every day. And although I'm using md5 keys as a prefix in my rowkeys I can still scan ranges this way. Some questions remain: 1. What is your opinion about this approach? 2. @Nick: I've read somewhere that a filter list would be less efficient that overwriting the split method. What do you think? 2013/2/26 Nick Dimiduk <[EMAIL PROTECTED]>: > Hi Paul, > > You want to run multiple scans so that you can filter the previous scan > results? Am I correct in my understanding of your objective? > > First, I suggest you use the PrefixFilter [0] instead of constructing the > rowkey prefix manually. This looks something like: > > byte[] md5Key = Utils.md5( "2013-01-07" ); > Scan scan = new Scan(md5Key); > scan.setFilter(new PrefixFilter(md5Key)); > > Yes, that's a bit redundant, but setting the startkey explicitly will save > you some unnecessary processing. > > This map reduce job works fine but this is just one scan job for this map >> reduce task. What do I have to do to pass multiple scans? > > > Do you mean processing on multiple dates? In that case, what you really > want is a full (unbounded) table scan. Since date is the first part of your > compound rowkey, there's no prefix and no need for a filter, just use new > Scan(). > > In general, you can use multiple filters in a given Scan (or Get). See the > FilterList [1] for details. > > Does this help? > Nick > > [0]: > http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/PrefixFilter.html > [1]: > http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/FilterList.html > > On Tue, Feb 26, 2013 at 5:41 AM, Paul van Hoven < > [EMAIL PROTECTED]> wrote: > >> My rowkeys look something like this: >> >> md5( date ) + md5( ip address ) >> >> So an example would be >> md5( "2013-02-08") + md5( "192.168.187.2") >> >> For one particular date I got several rows. Now I'd like to query >> different dates, for example "2013-01-01" and "2013-02-01" and some >> other. Additionally I'd like to perform this or these scans in a map >> reduce job. >> >> Currently my map reduce job looks like this: >> >> Configuration config = HBaseConfiguration.create(); >> Job job = new Job(config,"ToyJob");
-
Re: Map Reduce with multiple scansEnis Söztutar 2013-02-28, 02:57
There is a
MultiTableInputFormat that has been recently added to HBase. You might want to take a look at it. https://github.com/apache/hbase/blob/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java Enis On Wed, Feb 27, 2013 at 8:17 AM, Paul van Hoven < [EMAIL PROTECTED]> wrote: > Thanks for your answers. I ended up by extending > org.apache.hadoop.hbase.mapreduce.TableInputFormat and overwriting the > split method and passing it to the map reduce job the following way: > > public class TimeRangeTableInputFormat extends TableInputFormat { > > @Override > public List<InputSplit> getSplits( JobContext context ) throws > IOException > { > try { > List<InputSplit> splits = new > ArrayList<InputSplit>(); > Scan scan = getScan(); > > //startrow and endrow must be a string as bytes in > the format 2013-01-28 > byte startRow[] = scan.getStartRow(); > byte stopRow[] = scan.getStopRow(); > > //For each date in the span, we are going to > create a new scan object > SimpleDateFormat dateFormatter = new > SimpleDateFormat("yyyy-MM-dd"); > Date startDate = dateFormatter.parse( > Bytes.toString( startRow ) ); > Date endDate = dateFormatter.parse( > Bytes.toString( stopRow ) ); > > for( Date iterDate = startDate; > iterDate.compareTo(endDate) <= 0; > iterDate = Utils.addDays( iterDate, 1 ) ) { > > //since the dates in the row keys are > stored using md5 > byte[] md5Key = Utils.md5( > dateFormatter.format(iterDate) ); > int md5Length = 16; > int longLength = 8; > > byte[] subStartRow = Bytes.padTail( > md5Key, longLength ); //append > "0 0 0 0 0 0 0 0" > byte[] subEndRow = Bytes.padTail( > md5Key, longLength ); > subEndRow[md5Length-1]++; //last byte gets > counted up > > scan.setStartRow(subStartRow); > scan.setStopRow(subEndRow); > setScan(scan); > > for (InputSplit subSplit : > super.getSplits(context)) > splits.add((InputSplit) > ReflectionUtils.copy( context.getConfiguration(), > > (TableSplit) subSplit, new TableSplit() ) ); > } > > return splits; > > } catch( Exception e ) { > e.printStackTrace(); > return null; > } > } > > } > > This way I get a new scan object for every day. And although I'm using > md5 keys as a prefix in my rowkeys I can still scan ranges this way. > > Some questions remain: > 1. What is your opinion about this approach? > 2. @Nick: I've read somewhere that a filter list would be less > efficient that overwriting the split method. What do you think? > > > 2013/2/26 Nick Dimiduk <[EMAIL PROTECTED]>: > > Hi Paul, > > > > You want to run multiple scans so that you can filter the previous scan > > results? Am I correct in my understanding of your objective? > > > > First, I suggest you use the PrefixFilter [0] instead of constructing the > > rowkey prefix manually. This looks something like: > > > > byte[] md5Key = Utils.md5( "2013-01-07" ); > > Scan scan = new Scan(md5Key); > > scan.setFilter(new PrefixFilter(md5Key)); > > > > Yes, that's a bit redundant, but setting the startkey explicitly will > save > > you some unnecessary processing. > > > > This map reduce job works fine but this is just one scan job for this map > >> reduce task. What do I have to do to pass multiple scans?
-
Re: Map Reduce with multiple scansAsaf Mesika 2013-03-01, 14:17
Nick, if he didn't specify startKey, endKey in the Scan Object, and
delegate it to a Filter, this means he will send this scan to *all* regions in the system, instead of just one or two, no? On Tue, Feb 26, 2013 at 10:12 PM, Nick Dimiduk <[EMAIL PROTECTED]> wrote: > Hi Paul, > > You want to run multiple scans so that you can filter the previous scan > results? Am I correct in my understanding of your objective? > > First, I suggest you use the PrefixFilter [0] instead of constructing the > rowkey prefix manually. This looks something like: > > byte[] md5Key = Utils.md5( "2013-01-07" ); > Scan scan = new Scan(md5Key); > scan.setFilter(new PrefixFilter(md5Key)); > > Yes, that's a bit redundant, but setting the startkey explicitly will save > you some unnecessary processing. > > This map reduce job works fine but this is just one scan job for this map > > reduce task. What do I have to do to pass multiple scans? > > > Do you mean processing on multiple dates? In that case, what you really > want is a full (unbounded) table scan. Since date is the first part of your > compound rowkey, there's no prefix and no need for a filter, just use new > Scan(). > > In general, you can use multiple filters in a given Scan (or Get). See the > FilterList [1] for details. > > Does this help? > Nick > > [0]: > > http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/PrefixFilter.html > [1]: > > http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/FilterList.html > > On Tue, Feb 26, 2013 at 5:41 AM, Paul van Hoven < > [EMAIL PROTECTED]> wrote: > > > My rowkeys look something like this: > > > > md5( date ) + md5( ip address ) > > > > So an example would be > > md5( "2013-02-08") + md5( "192.168.187.2") > > > > For one particular date I got several rows. Now I'd like to query > > different dates, for example "2013-01-01" and "2013-02-01" and some > > other. Additionally I'd like to perform this or these scans in a map > > reduce job. > > > > Currently my map reduce job looks like this: > > > > Configuration config = HBaseConfiguration.create(); > > Job job = new Job(config,"ToyJob"); > > job.setJarByClass( PlayWithMapReduce.class ); > > > > byte[] md5Key = Utils.md5( "2013-01-07" ); > > int md5Length = 16; > > int longLength = 8; > > > > byte[] startRow = Bytes.padTail( md5Key, longLength ); //append "0 0 0 > > 0 0 0 0 0" > > byte[] endRow = Bytes.padTail( md5Key, longLength ); > > endRow[md5Length-1]++; //last byte gets counted up > > > > Scan scan = new Scan( startRow, endRow ); > > scan.setCaching(500); > > scan.setCacheBlocks(false); > > > > Filter f = new SingleColumnValueFilter( Bytes.toBytes("CF"), > > Bytes.toBytes("creativeId"), CompareOp.EQUAL, Bytes.toBytes("100") ); > > scan.setFilter(f); > > > > String tableName = "ToyDataTable"; > > TableMapReduceUtil.initTableMapperJob( tableName, scan, Mapper.class, > > null, null, job); > > > > This map reduce job works fine but this is just one scan job for this > > map reduce task. What do I have to do to pass multiple scans? Or do > > you have any other suggestions on how to achieve that goal? The > > constraint would be that it must be possible to combine it with map > > reduce. > > > |