|
Lior Schachter
2011-03-27, 17:00
Cosmin Lehene
2011-03-28, 07:04
Lior Schachter
2011-03-28, 07:41
Michael Segel
2011-03-28, 12:11
Lior Schachter
2011-03-28, 14:06
Weihua JIANG
2011-03-29, 01:55
Lior Schachter
2011-03-30, 15:58
|
-
using date as keyLior Schachter 2011-03-27, 17:00
Hi,
Last week I consulted he forum about hbase insertion optimization when the key format is : date_key. This key format is very good for efficient scans but creates hotspot a single region when inserting millions of rows. I would like to share and get a feedback on the solution we found: 1. insert one day. after region split see the start-end row of each server (this is done one to see keys distribution). 2. now, before inserting a day create programmatically empty regions with the start-end key from 1 (by creating rows in the meta-table). Assuming row key-distribution of a day does not change dramatically, the reduces can insert to multiple regions (thus avoiding hotspotting). Applying this method improved insert performance by a factor of 5 or so. Lior +
Lior Schachter 2011-03-27, 17:00
-
Re: using date as keyCosmin Lehene 2011-03-28, 07:04
Lior,
If you already know the key distribution you can create all the regions in advance. Are you inserting a single day or multiple days? 5X is a good improvement. Here are some more hints: Hadoop does a sort of the reduce keys before the actual reduce phase. This means that if your keys start with the date you'll get all reducers inserting for a consecutive days. If you need avoid hot regions and the key component of your date_key is evenly distributed among days, then you can emit key_date from mappers instead of date_key and then reassemble them correctly in reducers. This way you'll have an even distribution of inserts on your pre-created regions. Cosmin On Mar 27, 2011, at 8:00 PM, Lior Schachter wrote: > Hi, > Last week I consulted he forum about hbase insertion optimization when the > key format is : date_key. > This key format is very good for efficient scans but creates hotspot a > single region when inserting millions of rows. > > I would like to share and get a feedback on the solution we found: > 1. insert one day. after region split see the start-end row of each server > (this is done one to see keys distribution). > 2. now, before inserting a day create programmatically empty regions with > the start-end key from 1 (by creating rows in the meta-table). > Assuming row key-distribution of a day does not change dramatically, the > reduces can insert to multiple regions (thus avoiding hotspotting). > > Applying this method improved insert performance by a factor of 5 or so. > > Lior +
Cosmin Lehene 2011-03-28, 07:04
-
Re: using date as keyLior Schachter 2011-03-28, 07:41
Hi,
We insert a single day (about 10 million rows), but also support inserting consecutive days. We actually add the date to the key only in the reducer phase (the date comes from the configuration), so our mappers emit the key only. I wonder if using the TotalOrderPartitioner will give us some more improvement. Will test it soon.... Lior On Mon, Mar 28, 2011 at 9:04 AM, Cosmin Lehene <[EMAIL PROTECTED]> wrote: > Lior, > > If you already know the key distribution you can create all the regions in > advance. > Are you inserting a single day or multiple days? > > 5X is a good improvement. Here are some more hints: > > Hadoop does a sort of the reduce keys before the actual reduce phase. This > means that if your keys start with the date you'll get all reducers > inserting for a consecutive days. If you need avoid hot regions and the key > component of your date_key is evenly distributed among days, then you can > emit key_date from mappers instead of date_key and then reassemble them > correctly in reducers. This way you'll have an even distribution of inserts > on your pre-created regions. > > Cosmin > > > > On Mar 27, 2011, at 8:00 PM, Lior Schachter wrote: > > > Hi, > > Last week I consulted he forum about hbase insertion optimization when > the > > key format is : date_key. > > This key format is very good for efficient scans but creates hotspot a > > single region when inserting millions of rows. > > > > I would like to share and get a feedback on the solution we found: > > 1. insert one day. after region split see the start-end row of each > server > > (this is done one to see keys distribution). > > 2. now, before inserting a day create programmatically empty regions with > > the start-end key from 1 (by creating rows in the meta-table). > > Assuming row key-distribution of a day does not change dramatically, the > > reduces can insert to multiple regions (thus avoiding hotspotting). > > > > Applying this method improved insert performance by a factor of 5 or so. > > > > Lior > > +
Lior Schachter 2011-03-28, 07:41
-
RE: using date as keyMichael Segel 2011-03-28, 12:11
Sorry I'm a bit confused. First your title says using date as the key, yet what you're really doing is using date as part of the key. Second, you mention that you're adding date as part of the key in a reducer stage. What exactly is your use case? There are very few use cases where you need a reduce phase when writing to hbase. There's been a couple of discussions on reducing the potential for hot spots. One person chastised me for saying that you really couldn't reduce a potential 'hot spot' for time series data. Hbase does cache rows and in our testing, when we had enough readers, we saw the cache getting used because our test data set wasn't large enough and when the number of simulated users randomly fetching rows got to be a certain point, you could see that the rows were being returned from a cache and not a physical i/o fetch. When optimizing HBase, you *must* look at a specific use case. Here's an example... In one system, our only fetch use case for the data was a simple get(). No start/stop scans. So we hashed our key to gain even distribution. No hot spots. But this doesn't work well when you have start/stop key scans. Or when we wanted to fetch records for processing that were orthogonal to our row keys. There we had to do full table scans. One architect wanted to change schema design because it impacted our batch processing. Tried to tell him that the batch processing didn't matter and that getting a consistent get() time was more important. Adding 15-20 mins to a 2-3 hour batch job doesn't matter when you are designing a system that is supposed to deliver data in real time. My point is that by looking at the use case, we will be less efficient on inserts, but more efficient on fetches where we avoid hot spots. HTH -Mike > Date: Mon, 28 Mar 2011 09:41:22 +0200 > Subject: Re: using date as key > From: [EMAIL PROTECTED] > To: [EMAIL PROTECTED] > CC: [EMAIL PROTECTED] > > Hi, > We insert a single day (about 10 million rows), but also support inserting > consecutive days. > > We actually add the date to the key only in the reducer phase (the date > comes from the configuration), so our mappers emit the key only. > I wonder if using the TotalOrderPartitioner will give us some more > improvement. Will test it soon.... > > Lior > > > > > On Mon, Mar 28, 2011 at 9:04 AM, Cosmin Lehene <[EMAIL PROTECTED]> wrote: > > > Lior, > > > > If you already know the key distribution you can create all the regions in > > advance. > > Are you inserting a single day or multiple days? > > > > 5X is a good improvement. Here are some more hints: > > > > Hadoop does a sort of the reduce keys before the actual reduce phase. This > > means that if your keys start with the date you'll get all reducers > > inserting for a consecutive days. If you need avoid hot regions and the key > > component of your date_key is evenly distributed among days, then you can > > emit key_date from mappers instead of date_key and then reassemble them > > correctly in reducers. This way you'll have an even distribution of inserts > > on your pre-created regions. > > > > Cosmin > > > > > > > > On Mar 27, 2011, at 8:00 PM, Lior Schachter wrote: > > > > > Hi, > > > Last week I consulted he forum about hbase insertion optimization when > > the > > > key format is : date_key. > > > This key format is very good for efficient scans but creates hotspot a > > > single region when inserting millions of rows. > > > > > > I would like to share and get a feedback on the solution we found: > > > 1. insert one day. after region split see the start-end row of each > > server > > > (this is done one to see keys distribution). > > > 2. now, before inserting a day create programmatically empty regions with > > > the start-end key from 1 (by creating rows in the meta-table). > > > Assuming row key-distribution of a day does not change dramatically, the > > > reduces can insert to multiple regions (thus avoiding hotspotting). > > > > > > Applying this method improved insert performance by a factor of 5 or so. +
Michael Segel 2011-03-28, 12:11
-
Re: using date as keyLior Schachter 2011-03-28, 14:06
Hi,
Let me explain our processing model: 1. We decided that our work in hbase should have a day granularity (i.e. scan the rows between 20110304-20110301). 2. Once we persist a day data in hbase numerous scans work on this date. So we want scans to be efficient --> the key should start with the date to allow start/end key scanning. 3. we use map/reduce in order to aggregate data from our logs. each aggregation is persisted as column family in hbase (in 1 map/reduce we produce few aggregations/families). We add the date as a key prefix at the reduce stage before inserting the row to hbase (we don't need it in the map). 4. We could use bulk loading (hence persisting the reduce result to sequence file) but our hbase version (0.90.1) didn't support it. 5. Our main problem was that a single region was created for each day and it took about an hour to write 10 million rows to this region. 6. The solution was to open empty regions according to the key distribution. 7. it worked. now 10 million rows are inserted in about 15 minutes (5 machines). which is good for us. Hope this clarify things, Lior On Mon, Mar 28, 2011 at 2:11 PM, Michael Segel <[EMAIL PROTECTED]>wrote: > > Sorry I'm a bit confused. > > First your title says using date as the key, yet what you're really doing > is using date as part of the key. > Second, you mention that you're adding date as part of the key in a reducer > stage. > > What exactly is your use case? > There are very few use cases where you need a reduce phase when writing to > hbase. > > There's been a couple of discussions on reducing the potential for hot > spots. One person chastised me for saying that you really couldn't reduce a > potential 'hot spot' for time series data. Hbase does cache rows and in our > testing, when we had enough readers, we saw the cache getting used because > our test data set wasn't large enough and when the number of simulated users > randomly fetching rows got to be a certain point, you could see that the > rows were being returned from a cache and not a physical i/o fetch. > > When optimizing HBase, you *must* look at a specific use case. > > Here's an example... > > In one system, our only fetch use case for the data was a simple get(). No > start/stop scans. So we hashed our key to gain even distribution. No hot > spots. > But this doesn't work well when you have start/stop key scans. Or when we > wanted to fetch records for processing that were orthogonal to our row keys. > There we had to do full table scans. > One architect wanted to change schema design because it impacted our batch > processing. Tried to tell him that the batch processing didn't matter and > that getting a consistent get() time was more important. > Adding 15-20 mins to a 2-3 hour batch job doesn't matter when you are > designing a system that is supposed to deliver data in real time. > > My point is that by looking at the use case, we will be less efficient on > inserts, but more efficient on fetches where we avoid hot spots. > > HTH > > -Mike > > > Date: Mon, 28 Mar 2011 09:41:22 +0200 > > Subject: Re: using date as key > > From: [EMAIL PROTECTED] > > To: [EMAIL PROTECTED] > > CC: [EMAIL PROTECTED] > > > > Hi, > > We insert a single day (about 10 million rows), but also support > inserting > > consecutive days. > > > > We actually add the date to the key only in the reducer phase (the date > > comes from the configuration), so our mappers emit the key only. > > I wonder if using the TotalOrderPartitioner will give us some more > > improvement. Will test it soon.... > > > > Lior > > > > > > > > > > On Mon, Mar 28, 2011 at 9:04 AM, Cosmin Lehene <[EMAIL PROTECTED]> > wrote: > > > > > Lior, > > > > > > If you already know the key distribution you can create all the regions > in > > > advance. > > > Are you inserting a single day or multiple days? > > > > > > 5X is a good improvement. Here are some more hints: > > > > > > Hadoop does a sort of the reduce keys before the actual reduce phase. +
Lior Schachter 2011-03-28, 14:06
-
Re: using date as keyWeihua JIANG 2011-03-29, 01:55
Hi Lior,
I am very interested in your solution. We also need to insert large volume of chronical data into HBase. So, region split and copy is a thing we want to avoid. Can you paste your code related to empty region creation and .META table manuplication? It will be much helpful to us. Thanks Weihua 2011/3/28 Lior Schachter <[EMAIL PROTECTED]>: > Hi, > Let me explain our processing model: > 1. We decided that our work in hbase should have a day granularity (i.e. > scan the rows between 20110304-20110301). > 2. Once we persist a day data in hbase numerous scans work on this date. So > we want scans to be efficient --> the key should start with the date to > allow start/end key scanning. > 3. we use map/reduce in order to aggregate data from our logs. each > aggregation is persisted as column family in hbase (in 1 map/reduce we > produce few aggregations/families). We add the date as a key prefix at the > reduce stage before inserting the row to hbase (we don't need it in the > map). > 4. We could use bulk loading (hence persisting the reduce result to sequence > file) but our hbase version (0.90.1) didn't support it. > 5. Our main problem was that a single region was created for each day and it > took about an hour to write 10 million rows to this region. > 6. The solution was to open empty regions according to the key distribution. > 7. it worked. now 10 million rows are inserted in about 15 minutes (5 > machines). which is good for us. > > Hope this clarify things, > > Lior > > > > > On Mon, Mar 28, 2011 at 2:11 PM, Michael Segel <[EMAIL PROTECTED]>wrote: > >> >> Sorry I'm a bit confused. >> >> First your title says using date as the key, yet what you're really doing >> is using date as part of the key. >> Second, you mention that you're adding date as part of the key in a reducer >> stage. >> >> What exactly is your use case? >> There are very few use cases where you need a reduce phase when writing to >> hbase. >> >> There's been a couple of discussions on reducing the potential for hot >> spots. One person chastised me for saying that you really couldn't reduce a >> potential 'hot spot' for time series data. Hbase does cache rows and in our >> testing, when we had enough readers, we saw the cache getting used because >> our test data set wasn't large enough and when the number of simulated users >> randomly fetching rows got to be a certain point, you could see that the >> rows were being returned from a cache and not a physical i/o fetch. >> >> When optimizing HBase, you *must* look at a specific use case. >> >> Here's an example... >> >> In one system, our only fetch use case for the data was a simple get(). No >> start/stop scans. So we hashed our key to gain even distribution. No hot >> spots. >> But this doesn't work well when you have start/stop key scans. Or when we >> wanted to fetch records for processing that were orthogonal to our row keys. >> There we had to do full table scans. >> One architect wanted to change schema design because it impacted our batch >> processing. Tried to tell him that the batch processing didn't matter and >> that getting a consistent get() time was more important. >> Adding 15-20 mins to a 2-3 hour batch job doesn't matter when you are >> designing a system that is supposed to deliver data in real time. >> >> My point is that by looking at the use case, we will be less efficient on >> inserts, but more efficient on fetches where we avoid hot spots. >> >> HTH >> >> -Mike >> >> > Date: Mon, 28 Mar 2011 09:41:22 +0200 >> > Subject: Re: using date as key >> > From: [EMAIL PROTECTED] >> > To: [EMAIL PROTECTED] >> > CC: [EMAIL PROTECTED] >> > >> > Hi, >> > We insert a single day (about 10 million rows), but also support >> inserting >> > consecutive days. >> > >> > We actually add the date to the key only in the reducer phase (the date >> > comes from the configuration), so our mappers emit the key only. >> > I wonder if using the TotalOrderPartitioner will give us some more +
Weihua JIANG 2011-03-29, 01:55
-
Re: using date as keyLior Schachter 2011-03-30, 15:58
Hi,
See below the code + example. It assumes that dates are lexicographically ordered (e.g. YYYYMMDD 20100403). The following uses cases are covered: 1. create empty regions in new table. 2. add empty regions to existing table (existing regions were created using the same code). 3. insert empty regions in the middle i.e. regions for 20100403 and 20100405 were created and we want to catch up the 20100404). package com.infolinks.hbase.util; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.log4j.Logger; import java.io.IOException; import java.util.*; /** * User: Lior Schachter * Email: [EMAIL PROTECTED] */ public class HBaseOperations { private static final Logger logger Logger.getLogger(com.infolinks.hadoop.commons.hbase.HBaseOperations.class); private Configuration conf; private HTablePool pool = null; public HBaseOperations(Configuration conf) { this.conf = conf; pool = new HTablePool(conf, 20); } private Put createPut(byte[] row, HRegionInfo hri, Result originalRow) throws IOException { Put put = new Put(row); if (originalRow != null) { for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> cf_en : originalRow.getNoVersionMap().entrySet()) { byte[] cf = cf_en.getKey(); for (Map.Entry<byte[], byte[]> c_en : cf_en.getValue().entrySet()) { put.add(cf, c_en.getKey(), c_en.getValue()); } } } put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, Writables.getBytes(hri)); return put; } private HTable getMetaTable() throws IOException { return new HTable(conf, HConstants.META_TABLE_NAME); } private Result getMetaTableRowByDate(byte[] tableName, String date, HTable metaTable) throws IOException { Result row = null; ResultScanner s = null; try { s = metaTable.getScanner(new Scan()); for (Result result : s) { byte[] infoBytes result.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); if (infoBytes != null) { HRegionInfo info Writables.getHRegionInfo(result.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); HTableDescriptor desc = info.getTableDesc(); if (Bytes.compareTo(desc.getName(), tableName) == 0) { if (Bytes.toString(info.getStartKey()).startsWith(date) || Bytes.toString(info.getEndKey()).startsWith(date)) { logger.info("Date " + date + ", already exists for table " + Bytes.toString(tableName)); return null; } if (Bytes.compareTo(info.getEndKey(), Bytes.toBytes(date)) > 0) { return result; } row = result; } } } } finally { if (s != null) { s.close(); } } return row; } public int createEmptyRegions(String date, String delimiter, final HTable table, String[] startKeys) throws IOException { Arrays.sort(startKeys); HTable meta = getMetaTable(); List<HRegionInfo> newRegions = new ArrayList<HRegionInfo>(startKeys.length); Result lastRow = getMetaTableRowByDate(table.getTableName(), date, meta); int count = 0; byte[] endKey = HConstants.EMPTY_BYTE_ARRAY; if (lastRow != null) { HBaseAdmin admin = new HBaseAdmin(conf); HRegionInfo info = (HRegionInfo) Writables.getWritable(lastRow.getColumnLatest(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER).getValue(), new HRegionInfo()); HRegionInfo hri; if (Bytes.compareTo(info.getStartKey(), info.getEndKey()) == 0 && Bytes.compareTo(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY) == 0) { //new table hri = new HRegionInfo(table.getTableDescriptor(), HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes(date + delimiter + startKeys[0]), false, info.getRegionId()); Put put = createPut(hri.getRegionName(), hri, lastRow); meta.put(put); } else if (Bytes.compareTo(HConstants.EMPTY_BYTE_ARRAY, info.getEndKey()) != 0) { //insert in the middle endKey = info.getEndKey(); hri = new HRegionInfo(table.getTableDescriptor(), info.getStartKey(), Bytes.toBytes(date), false, info.getRegionId()); Put put = createPut(hri.getRegionName(), hri, lastRow); meta.put(put); hri = new HRegionInfo(table.getTableDescriptor(), Bytes.toBytes(date), Bytes.toBytes(date + delimiter + startKeys[0])); put = createPut(hri.getRegionName(), hri, null); meta.put(put); newRegions.add(hri); } else { //new day hri = new HRegionInfo(table.getTableDescriptor(), info.getStartKey(), Bytes.toBytes(date), false, info.getRegionId()); Put put = createPut(hri.getRegionName(), hri, lastRow); meta.put(put); hri = new HRegionInfo(table.getTableDescriptor(), Bytes.toBytes(date), Bytes.toBytes(date + delimiter + startKeys[0])); put = createPut(hri.getRegionName(), hri, null); meta.put(put); newRegions.add(hri); } for (int i = 0; i < startKeys.length; i++) { if +
Lior Schachter 2011-03-30, 15:58
|