Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig, mail # user - How can I split the data with more reducers?


Copy link to this message
-
Re: How can I split the data with more reducers?
Haitao Yao 2012-09-17, 09:26
The pie chart is generated by MemoryAnalyzer(http://www.eclipse.org/mat/) from the heap dump when OOME happened.

I've increased all the parallelisms and set default_parallel to 3. It does not work.

Still I don't know what the first MR job compiled by Pig is doing . Only 1 reducer all the time ...

Thank you.

Haitao Yao
[EMAIL PROTECTED]
weibo: @haitao_yao
Skype:  haitao.yao.final

On 2012-9-17, at 下午5:07, Dmitriy Ryaboy wrote:

> Neat pie chart! What produces this?
>
> Trunk is not entirely stable right now, but it's stabilizing pretty rapidly (as long as you don't go using DateTime types and Cube operations.. don't think they've been put through the paces quite yet, and will be considered experimental when 0.11 does roll out).
>
> It would be interesting to know if we fixed the memory issue you are encountering, though.
>
> 0.10 will, I suspect, actually perform somewhat worse in your case than 0.9, due to the massive amount of distincting you appear to be doing.
>
> Since we now see that the memory is spent on actual data, not general overhead, I think your best bet is to increase parallelism (via the "parallel" keyword) where possible, and where not possible -- say, if you are doing a "group all" -- perform distinct counting via 2 MR jobs, as I outlined earlier.
>
> Pig 0.11 will also have an experimental option to codegen custom tuples for known schemas, which may drop the memory footprint significantly, but that's a different conversation.
>
> Hope this helps,
> Dmitriy
>
> On Mon, Sep 17, 2012 at 1:53 AM, Haitao Yao <[EMAIL PROTECTED]> wrote:
> Thank you very much for the reply. I've checked the latest heapdump again,and you're right: the OOME is not caused by split, but DefaultDataBag. Sorry for the misleading, I've got too many heap dumps and I ' confused.
>
> Here's  the latest screen shot of the heap dump:
> <aa.jpg>
> There's a lot of DefaultDataBag.
>
> I've tried your suggestion: set pig.exec.mapPartAgg true; set pig.exec.mapPartAgg.minReduction 3; But the reducer count is still 1.
> The pig version I'm using is 0.9.2.
>
> BTW, is "the latest trunk" stable enough for production deployment? If not, does 0.10.0 provide this optimization rule? When will  0.11 release?
>
> Thank you again.
>
> Haitao Yao
> [EMAIL PROTECTED]
> weibo: @haitao_yao
> Skype:  haitao.yao.final
>
> On 2012-9-17, at 下午1:01, Dmitriy Ryaboy wrote:
>
>> Ok, then it's not POSplit that's holding the memory -- it does not
>> participate in any of the reduce stages, according the the plan you
>> attached.
>>
>> To set parallelism, you can hardcode it on every operation that causes
>> an MR boundary, with the exception of "group all"  and "limit" since
>> those by definition require a single reducer. So, you can alter your
>> script to explicitly request parallelism to be greater than what is
>> estimated: "join .. parallel $P", "group by .. parallel $P", "order
>> ... parallel $P", etc.
>>
>> I would recommend two things:
>> 1) Make sure you are running the latest trunk, and have enabled
>> in-memory aggregation ( set pig.exec.mapPartAgg true; set
>> pig.exec.mapPartAgg.minReduction 3 ). I just made some significant
>> improvements to Distinct's Initial phase (not requiring it to register
>> with SpillableMemoryManager at all), and also improved in-mem
>> aggregation performance.
>>
>> 2) It seems like you are doing a lot of "group, distinct the group,
>> count" type operations. If you do have a distinct group that is very
>> large, loading it all into ram is bound to cause problems. When the
>> size of distinct sets is expected to be fairly high, we usually
>> recommend a different pattern for count(distinct x):
>>
>> Instead of :
>> results = foreach (group data by country) {
>>  distinct_ids = distinct data.id;
>>  generate group as country, COUNT(distinct_ids) as num_dist,
>> COUNT(data) as total;
>> }
>>
>> Do the following:
>>
>> results_per_id = foreach (group data by (country, id))
>>  generate flatten(group) as (country, id), COUNT(data) as num_repeats;