|
|
-
How can I split the data with more reducers?
Haitao Yao 2012-09-16, 02:08
Hi, I 'v encountered a problem: the job failed because of POSplit retained too much memory in the reducer. How can I specify more reducers for the spill?
Here's the screen snapshot of the Heap dump. And here's the snippet of my split script:
split RawData into AURawData if type == 2, NURawData if type == 1, InRawData if type == 9, GCData if type == 61, HCData if type == 71, TutorialRawData if type == 3 or t ype == 15;
There's 3 similar split clause in my script. The reducer count is always 1. How can I increase it?
Thanks.
Haitao Yao [EMAIL PROTECTED] weibo: @haitao_yao Skype: haitao.yao.final
-
Re: How can I split the data with more reducers?
Dmitriy Ryaboy 2012-09-16, 02:39
That looks like a mapper, not a reducer. What's the script doing?
Dmitriy
On Sat, Sep 15, 2012 at 7:08 PM, Haitao Yao <[EMAIL PROTECTED]> wrote:
> Hi, > I 'v encountered a problem: the job failed because of POSplit retained too > much memory in the reducer. How can I specify more reducers for the spill? > > Here's the screen snapshot of the Heap dump. > > > And here's the snippet of my split script: > > split RawData into AURawData if type == 2, NURawData if type == 1, > InRawData if type == 9, GCData if type == 61, HCData if type == 71, > TutorialRawData if type == 3 or t ype == 15; > > There's 3 similar split clause in my script. The reducer count is always > 1. How can I increase it? > > Thanks. > > > > Haitao Yao > [EMAIL PROTECTED] > weibo: @haitao_yao > Skype: haitao.yao.final > >
-
Re: How can I split the data with more reducers?
Haitao Yao 2012-09-16, 02:50
No, I also thought it is a mapper , but It surely is a reducer. all the mappers succeeded and the reducer failed.
Haitao Yao [EMAIL PROTECTED] weibo: @haitao_yao Skype: haitao.yao.final
On 2012-9-16, at 上午10:08, Haitao Yao wrote:
> Hi, > I 'v encountered a problem: the job failed because of POSplit retained too much memory in the reducer. How can I specify more reducers for the spill? > > Here's the screen snapshot of the Heap dump. > <aa.jpg> > > > And here's the snippet of my split script: > > split RawData into AURawData if type == 2, NURawData if type == 1, InRawData if type == 9, GCData if type == 61, HCData if type == 71, TutorialRawData if type == 3 or t ype == 15; > > There's 3 similar split clause in my script. The reducer count is always 1. How can I increase it? > > Thanks. > > > > Haitao Yao > [EMAIL PROTECTED] > weibo: @haitao_yao > Skype: haitao.yao.final >
-
Re: How can I split the data with more reducers?
Dmitriy Ryaboy 2012-09-16, 08:41
Still would like to see the script or the explain plan..
D
On Sat, Sep 15, 2012 at 7:50 PM, Haitao Yao <[EMAIL PROTECTED]> wrote: > No, I also thought it is a mapper , but It surely is a reducer. all the mappers succeeded and the reducer failed. > > > > Haitao Yao > [EMAIL PROTECTED] > weibo: @haitao_yao > Skype: haitao.yao.final > > On 2012-9-16, at 上午10:08, Haitao Yao wrote: > >> Hi, >> I 'v encountered a problem: the job failed because of POSplit retained too much memory in the reducer. How can I specify more reducers for the spill? >> >> Here's the screen snapshot of the Heap dump. >> <aa.jpg> >> >> >> And here's the snippet of my split script: >> >> split RawData into AURawData if type == 2, NURawData if type == 1, InRawData if type == 9, GCData if type == 61, HCData if type == 71, TutorialRawData if type == 3 or t ype == 15; >> >> There's 3 similar split clause in my script. The reducer count is always 1. How can I increase it? >> >> Thanks. >> >> >> >> Haitao Yao >> [EMAIL PROTECTED] >> weibo: @haitao_yao >> Skype: haitao.yao.final >> >
-
Re: How can I split the data with more reducers?
Haitao Yao 2012-09-16, 09:05
here's the explain result compressed.(The apache mail server does not allow big attachments.)
Haitao Yao [EMAIL PROTECTED] weibo: @haitao_yao Skype: haitao.yao.final
On 2012-9-16, at 下午4:41, Dmitriy Ryaboy wrote:
> Still would like to see the script or the explain plan.. > > D > > On Sat, Sep 15, 2012 at 7:50 PM, Haitao Yao <[EMAIL PROTECTED]> wrote: >> No, I also thought it is a mapper , but It surely is a reducer. all the mappers succeeded and the reducer failed. >> >> >> >> Haitao Yao >> [EMAIL PROTECTED] >> weibo: @haitao_yao >> Skype: haitao.yao.final >> >> On 2012-9-16, at 上午10:08, Haitao Yao wrote: >> >>> Hi, >>> I 'v encountered a problem: the job failed because of POSplit retained too much memory in the reducer. How can I specify more reducers for the spill? >>> >>> Here's the screen snapshot of the Heap dump. >>> <aa.jpg> >>> >>> >>> And here's the snippet of my split script: >>> >>> split RawData into AURawData if type == 2, NURawData if type == 1, InRawData if type == 9, GCData if type == 61, HCData if type == 71, TutorialRawData if type == 3 or t ype == 15; >>> >>> There's 3 similar split clause in my script. The reducer count is always 1. How can I increase it? >>> >>> Thanks. >>> >>> >>> >>> Haitao Yao >>> [EMAIL PROTECTED] >>> weibo: @haitao_yao >>> Skype: haitao.yao.final >>> >>
-
Re: How can I split the data with more reducers?
Haitao Yao 2012-09-16, 09:18
The map output of the first MR job is over 500MB, and only 1 reducer processes it. So OutOfMemoryError is caused.
After set the child memory to 1GB, the first job succeeded. But most of our jobs does not need that much memory. 512MB is enough if I can set the reducer to more than 1. Haitao Yao [EMAIL PROTECTED] weibo: @haitao_yao Skype: haitao.yao.final
On 2012-9-16, at 下午5:05, Haitao Yao wrote:
> here's the explain result compressed.(The apache mail server does not allow big attachments.) > <explain.tar.gz> > > > Haitao Yao > [EMAIL PROTECTED] > weibo: @haitao_yao > Skype: haitao.yao.final > > On 2012-9-16, at 下午4:41, Dmitriy Ryaboy wrote: > >> Still would like to see the script or the explain plan.. >> >> D >> >> On Sat, Sep 15, 2012 at 7:50 PM, Haitao Yao <[EMAIL PROTECTED]> wrote: >>> No, I also thought it is a mapper , but It surely is a reducer. all the mappers succeeded and the reducer failed. >>> >>> >>> >>> Haitao Yao >>> [EMAIL PROTECTED] >>> weibo: @haitao_yao >>> Skype: haitao.yao.final >>> >>> On 2012-9-16, at 上午10:08, Haitao Yao wrote: >>> >>>> Hi, >>>> I 'v encountered a problem: the job failed because of POSplit retained too much memory in the reducer. How can I specify more reducers for the spill? >>>> >>>> Here's the screen snapshot of the Heap dump. >>>> <aa.jpg> >>>> >>>> >>>> And here's the snippet of my split script: >>>> >>>> split RawData into AURawData if type == 2, NURawData if type == 1, InRawData if type == 9, GCData if type == 61, HCData if type == 71, TutorialRawData if type == 3 or t ype == 15; >>>> >>>> There's 3 similar split clause in my script. The reducer count is always 1. How can I increase it? >>>> >>>> Thanks. >>>> >>>> >>>> >>>> Haitao Yao >>>> [EMAIL PROTECTED] >>>> weibo: @haitao_yao >>>> Skype: haitao.yao.final >>>> >>> >
-
Re: How can I split the data with more reducers?
Dmitriy Ryaboy 2012-09-17, 05:01
Ok, then it's not POSplit that's holding the memory -- it does not participate in any of the reduce stages, according the the plan you attached.
To set parallelism, you can hardcode it on every operation that causes an MR boundary, with the exception of "group all" and "limit" since those by definition require a single reducer. So, you can alter your script to explicitly request parallelism to be greater than what is estimated: "join .. parallel $P", "group by .. parallel $P", "order ... parallel $P", etc.
I would recommend two things: 1) Make sure you are running the latest trunk, and have enabled in-memory aggregation ( set pig.exec.mapPartAgg true; set pig.exec.mapPartAgg.minReduction 3 ). I just made some significant improvements to Distinct's Initial phase (not requiring it to register with SpillableMemoryManager at all), and also improved in-mem aggregation performance.
2) It seems like you are doing a lot of "group, distinct the group, count" type operations. If you do have a distinct group that is very large, loading it all into ram is bound to cause problems. When the size of distinct sets is expected to be fairly high, we usually recommend a different pattern for count(distinct x):
Instead of : results = foreach (group data by country) { distinct_ids = distinct data.id; generate group as country, COUNT(distinct_ids) as num_dist, COUNT(data) as total; }
Do the following:
results_per_id = foreach (group data by (country, id)) generate flatten(group) as (country, id), COUNT(data) as num_repeats; results = foreach (group results_per_id by country) generate group as country, COUNT(results_per_id) as num_dist, SUM(results_per_id.num_repeats) as total;
This will introduce an extra MR step, but it's much more scalable when you get into millions of distincts in a single dimension.
D
On Sun, Sep 16, 2012 at 2:18 AM, Haitao Yao <[EMAIL PROTECTED]> wrote: > The map output of the first MR job is over 500MB, and only 1 reducer processes it. So OutOfMemoryError is caused. > > After set the child memory to 1GB, the first job succeeded. But most of our jobs does not need that much memory. 512MB is enough if I can set the reducer to more than 1. > > > > > Haitao Yao > [EMAIL PROTECTED] > weibo: @haitao_yao > Skype: haitao.yao.final > > On 2012-9-16, at 下午5:05, Haitao Yao wrote: > >> here's the explain result compressed.(The apache mail server does not allow big attachments.) >> <explain.tar.gz> >> >> >> Haitao Yao >> [EMAIL PROTECTED] >> weibo: @haitao_yao >> Skype: haitao.yao.final >> >> On 2012-9-16, at 下午4:41, Dmitriy Ryaboy wrote: >> >>> Still would like to see the script or the explain plan.. >>> >>> D >>> >>> On Sat, Sep 15, 2012 at 7:50 PM, Haitao Yao <[EMAIL PROTECTED]> wrote: >>>> No, I also thought it is a mapper , but It surely is a reducer. all the mappers succeeded and the reducer failed. >>>> >>>> >>>> >>>> Haitao Yao >>>> [EMAIL PROTECTED] >>>> weibo: @haitao_yao >>>> Skype: haitao.yao.final >>>> >>>> On 2012-9-16, at 上午10:08, Haitao Yao wrote: >>>> >>>>> Hi, >>>>> I 'v encountered a problem: the job failed because of POSplit retained too much memory in the reducer. How can I specify more reducers for the spill? >>>>> >>>>> Here's the screen snapshot of the Heap dump. >>>>> <aa.jpg> >>>>> >>>>> >>>>> And here's the snippet of my split script: >>>>> >>>>> split RawData into AURawData if type == 2, NURawData if type == 1, InRawData if type == 9, GCData if type == 61, HCData if type == 71, TutorialRawData if type == 3 or t ype == 15; >>>>> >>>>> There's 3 similar split clause in my script. The reducer count is always 1. How can I increase it? >>>>> >>>>> Thanks. >>>>> >>>>> >>>>> >>>>> Haitao Yao >>>>> [EMAIL PROTECTED] >>>>> weibo: @haitao_yao >>>>> Skype: haitao.yao.final >>>>> >>>> >> >
-
Re: How can I split the data with more reducers?
Haitao Yao 2012-09-17, 08:53
Thank you very much for the reply. I've checked the latest heapdump again,and you're right: the OOME is not caused by split, but DefaultDataBag. Sorry for the misleading, I've got too many heap dumps and I ' confused.
Here's the latest screen shot of the heap dump:
There's a lot of DefaultDataBag.
I've tried your suggestion: set pig.exec.mapPartAgg true; set pig.exec.mapPartAgg.minReduction 3; But the reducer count is still 1. The pig version I'm using is 0.9.2.
BTW, is "the latest trunk" stable enough for production deployment? If not, does 0.10.0 provide this optimization rule? When will 0.11 release?
Thank you again.
Haitao Yao [EMAIL PROTECTED] weibo: @haitao_yao Skype: haitao.yao.final
On 2012-9-17, at 下午1:01, Dmitriy Ryaboy wrote:
> Ok, then it's not POSplit that's holding the memory -- it does not > participate in any of the reduce stages, according the the plan you > attached. > > To set parallelism, you can hardcode it on every operation that causes > an MR boundary, with the exception of "group all" and "limit" since > those by definition require a single reducer. So, you can alter your > script to explicitly request parallelism to be greater than what is > estimated: "join .. parallel $P", "group by .. parallel $P", "order > ... parallel $P", etc. > > I would recommend two things: > 1) Make sure you are running the latest trunk, and have enabled > in-memory aggregation ( set pig.exec.mapPartAgg true; set > pig.exec.mapPartAgg.minReduction 3 ). I just made some significant > improvements to Distinct's Initial phase (not requiring it to register > with SpillableMemoryManager at all), and also improved in-mem > aggregation performance. > > 2) It seems like you are doing a lot of "group, distinct the group, > count" type operations. If you do have a distinct group that is very > large, loading it all into ram is bound to cause problems. When the > size of distinct sets is expected to be fairly high, we usually > recommend a different pattern for count(distinct x): > > Instead of : > results = foreach (group data by country) { > distinct_ids = distinct data.id; > generate group as country, COUNT(distinct_ids) as num_dist, > COUNT(data) as total; > } > > Do the following: > > results_per_id = foreach (group data by (country, id)) > generate flatten(group) as (country, id), COUNT(data) as num_repeats; > results = foreach (group results_per_id by country) > generate group as country, COUNT(results_per_id) as num_dist, > SUM(results_per_id.num_repeats) as total; > > This will introduce an extra MR step, but it's much more scalable when > you get into millions of distincts in a single dimension. > > D > > On Sun, Sep 16, 2012 at 2:18 AM, Haitao Yao <[EMAIL PROTECTED]> wrote: >> The map output of the first MR job is over 500MB, and only 1 reducer processes it. So OutOfMemoryError is caused. >> >> After set the child memory to 1GB, the first job succeeded. But most of our jobs does not need that much memory. 512MB is enough if I can set the reducer to more than 1. >> >> >> >> >> Haitao Yao >> [EMAIL PROTECTED] >> weibo: @haitao_yao >> Skype: haitao.yao.final >> >> On 2012-9-16, at 下午5:05, Haitao Yao wrote: >> >>> here's the explain result compressed.(The apache mail server does not allow big attachments.) >>> <explain.tar.gz> >>> >>> >>> Haitao Yao >>> [EMAIL PROTECTED] >>> weibo: @haitao_yao >>> Skype: haitao.yao.final >>> >>> On 2012-9-16, at 下午4:41, Dmitriy Ryaboy wrote: >>> >>>> Still would like to see the script or the explain plan.. >>>> >>>> D >>>> >>>> On Sat, Sep 15, 2012 at 7:50 PM, Haitao Yao <[EMAIL PROTECTED]> wrote: >>>>> No, I also thought it is a mapper , but It surely is a reducer. all the mappers succeeded and the reducer failed. >>>>> >>>>> >>>>> >>>>> Haitao Yao >>>>> [EMAIL PROTECTED] >>>>> weibo: @haitao_yao >>>>> Skype: haitao.yao.final >>>>> >>>>> On 2012-9-16, at 上午10:08, Haitao Yao wrote: >>>>> >>>>>> Hi, >>>>>> I 'v encountered a problem: the job failed because of POSplit retained too much memory in the reducer. How can I specify more reducers for the spill?
-
Re: How can I split the data with more reducers?
Dmitriy Ryaboy 2012-09-17, 09:07
Neat pie chart! What produces this?
Trunk is not entirely stable right now, but it's stabilizing pretty rapidly (as long as you don't go using DateTime types and Cube operations.. don't think they've been put through the paces quite yet, and will be considered experimental when 0.11 does roll out).
It would be interesting to know if we fixed the memory issue you are encountering, though.
0.10 will, I suspect, actually perform somewhat worse in your case than 0.9, due to the massive amount of distincting you appear to be doing.
Since we now see that the memory is spent on actual data, not general overhead, I think your best bet is to increase parallelism (via the "parallel" keyword) where possible, and where not possible -- say, if you are doing a "group all" -- perform distinct counting via 2 MR jobs, as I outlined earlier.
Pig 0.11 will also have an experimental option to codegen custom tuples for known schemas, which may drop the memory footprint significantly, but that's a different conversation.
Hope this helps, Dmitriy
On Mon, Sep 17, 2012 at 1:53 AM, Haitao Yao <[EMAIL PROTECTED]> wrote:
> Thank you very much for the reply. I've checked the latest heapdump > again,and you're right: the OOME is not caused by split, but > DefaultDataBag. Sorry for the misleading, I've got too many heap dumps and > I ' confused. > > Here's the latest screen shot of the heap dump: > There's a lot of DefaultDataBag. > > I've tried your suggestion: set pig.exec.mapPartAgg true; > set pig.exec.mapPartAgg.minReduction 3; But the reducer count is still 1. > The pig version I'm using is 0.9.2. > > BTW, is "the latest trunk" stable enough for production deployment? If > not, does 0.10.0 provide this optimization rule? When will 0.11 release? > > Thank you again. > > Haitao Yao > [EMAIL PROTECTED] > weibo: @haitao_yao > Skype: haitao.yao.final > > On 2012-9-17, at 下午1:01, Dmitriy Ryaboy wrote: > > Ok, then it's not POSplit that's holding the memory -- it does not > participate in any of the reduce stages, according the the plan you > attached. > > To set parallelism, you can hardcode it on every operation that causes > an MR boundary, with the exception of "group all" and "limit" since > those by definition require a single reducer. So, you can alter your > script to explicitly request parallelism to be greater than what is > estimated: "join .. parallel $P", "group by .. parallel $P", "order > ... parallel $P", etc. > > I would recommend two things: > 1) Make sure you are running the latest trunk, and have enabled > in-memory aggregation ( set pig.exec.mapPartAgg true; set > pig.exec.mapPartAgg.minReduction 3 ). I just made some significant > improvements to Distinct's Initial phase (not requiring it to register > with SpillableMemoryManager at all), and also improved in-mem > aggregation performance. > > 2) It seems like you are doing a lot of "group, distinct the group, > count" type operations. If you do have a distinct group that is very > large, loading it all into ram is bound to cause problems. When the > size of distinct sets is expected to be fairly high, we usually > recommend a different pattern for count(distinct x): > > Instead of : > results = foreach (group data by country) { > distinct_ids = distinct data.id; > generate group as country, COUNT(distinct_ids) as num_dist, > COUNT(data) as total; > } > > Do the following: > > results_per_id = foreach (group data by (country, id)) > generate flatten(group) as (country, id), COUNT(data) as num_repeats; > results = foreach (group results_per_id by country) > generate group as country, COUNT(results_per_id) as num_dist, > SUM(results_per_id.num_repeats) as total; > > This will introduce an extra MR step, but it's much more scalable when > you get into millions of distincts in a single dimension. > > D > > On Sun, Sep 16, 2012 at 2:18 AM, Haitao Yao <[EMAIL PROTECTED]> wrote: > > The map output of the first MR job is over 500MB, and only 1 reducer > processes it. So OutOfMemoryError is caused.
-
Re: How can I split the data with more reducers?
Haitao Yao 2012-09-17, 09:26
The pie chart is generated by MemoryAnalyzer( http://www.eclipse.org/mat/) from the heap dump when OOME happened. I've increased all the parallelisms and set default_parallel to 3. It does not work. Still I don't know what the first MR job compiled by Pig is doing . Only 1 reducer all the time ... Thank you. Haitao Yao [EMAIL PROTECTED] weibo: @haitao_yao Skype: haitao.yao.final On 2012-9-17, at 下午5:07, Dmitriy Ryaboy wrote: > Neat pie chart! What produces this? > > Trunk is not entirely stable right now, but it's stabilizing pretty rapidly (as long as you don't go using DateTime types and Cube operations.. don't think they've been put through the paces quite yet, and will be considered experimental when 0.11 does roll out). > > It would be interesting to know if we fixed the memory issue you are encountering, though. > > 0.10 will, I suspect, actually perform somewhat worse in your case than 0.9, due to the massive amount of distincting you appear to be doing. > > Since we now see that the memory is spent on actual data, not general overhead, I think your best bet is to increase parallelism (via the "parallel" keyword) where possible, and where not possible -- say, if you are doing a "group all" -- perform distinct counting via 2 MR jobs, as I outlined earlier. > > Pig 0.11 will also have an experimental option to codegen custom tuples for known schemas, which may drop the memory footprint significantly, but that's a different conversation. > > Hope this helps, > Dmitriy > > On Mon, Sep 17, 2012 at 1:53 AM, Haitao Yao <[EMAIL PROTECTED]> wrote: > Thank you very much for the reply. I've checked the latest heapdump again,and you're right: the OOME is not caused by split, but DefaultDataBag. Sorry for the misleading, I've got too many heap dumps and I ' confused. > > Here's the latest screen shot of the heap dump: > <aa.jpg> > There's a lot of DefaultDataBag. > > I've tried your suggestion: set pig.exec.mapPartAgg true; set pig.exec.mapPartAgg.minReduction 3; But the reducer count is still 1. > The pig version I'm using is 0.9.2. > > BTW, is "the latest trunk" stable enough for production deployment? If not, does 0.10.0 provide this optimization rule? When will 0.11 release? > > Thank you again. > > Haitao Yao > [EMAIL PROTECTED] > weibo: @haitao_yao > Skype: haitao.yao.final > > On 2012-9-17, at 下午1:01, Dmitriy Ryaboy wrote: > >> Ok, then it's not POSplit that's holding the memory -- it does not >> participate in any of the reduce stages, according the the plan you >> attached. >> >> To set parallelism, you can hardcode it on every operation that causes >> an MR boundary, with the exception of "group all" and "limit" since >> those by definition require a single reducer. So, you can alter your >> script to explicitly request parallelism to be greater than what is >> estimated: "join .. parallel $P", "group by .. parallel $P", "order >> ... parallel $P", etc. >> >> I would recommend two things: >> 1) Make sure you are running the latest trunk, and have enabled >> in-memory aggregation ( set pig.exec.mapPartAgg true; set >> pig.exec.mapPartAgg.minReduction 3 ). I just made some significant >> improvements to Distinct's Initial phase (not requiring it to register >> with SpillableMemoryManager at all), and also improved in-mem >> aggregation performance. >> >> 2) It seems like you are doing a lot of "group, distinct the group, >> count" type operations. If you do have a distinct group that is very >> large, loading it all into ram is bound to cause problems. When the >> size of distinct sets is expected to be fairly high, we usually >> recommend a different pattern for count(distinct x): >> >> Instead of : >> results = foreach (group data by country) { >> distinct_ids = distinct data.id; >> generate group as country, COUNT(distinct_ids) as num_dist, >> COUNT(data) as total; >> } >> >> Do the following: >> >> results_per_id = foreach (group data by (country, id)) >> generate flatten(group) as (country, id), COUNT(data) as num_repeats;
|
|