Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Pig >> mail # user >> How can I split the data with more reducers?


+
Haitao Yao 2012-09-16, 02:08
+
Haitao Yao 2012-09-16, 02:50
+
Dmitriy Ryaboy 2012-09-16, 08:41
+
Haitao Yao 2012-09-16, 09:05
+
Haitao Yao 2012-09-16, 09:18
+
Dmitriy Ryaboy 2012-09-17, 05:01
+
Haitao Yao 2012-09-17, 08:53
Copy link to this message
-
Re: How can I split the data with more reducers?
Neat pie chart! What produces this?

Trunk is not entirely stable right now, but it's stabilizing pretty rapidly
(as long as you don't go using DateTime types and Cube operations.. don't
think they've been put through the paces quite yet, and will be considered
experimental when 0.11 does roll out).

It would be interesting to know if we fixed the memory issue you are
encountering, though.

0.10 will, I suspect, actually perform somewhat worse in your case than
0.9, due to the massive amount of distincting you appear to be doing.

Since we now see that the memory is spent on actual data, not general
overhead, I think your best bet is to increase parallelism (via the
"parallel" keyword) where possible, and where not possible -- say, if you
are doing a "group all" -- perform distinct counting via 2 MR jobs, as I
outlined earlier.

Pig 0.11 will also have an experimental option to codegen custom tuples for
known schemas, which may drop the memory footprint significantly, but
that's a different conversation.

Hope this helps,
Dmitriy

On Mon, Sep 17, 2012 at 1:53 AM, Haitao Yao <[EMAIL PROTECTED]> wrote:

> Thank you very much for the reply. I've checked the latest heapdump
> again,and you're right: the OOME is not caused by split, but
> DefaultDataBag. Sorry for the misleading, I've got too many heap dumps and
> I ' confused.
>
> Here's  the latest screen shot of the heap dump:
> There's a lot of DefaultDataBag.
>
> I've tried your suggestion: set pig.exec.mapPartAgg true;
> set pig.exec.mapPartAgg.minReduction 3; But the reducer count is still 1.
> The pig version I'm using is 0.9.2.
>
> BTW, is "the latest trunk" stable enough for production deployment? If
> not, does 0.10.0 provide this optimization rule? When will  0.11 release?
>
> Thank you again.
>
> Haitao Yao
> [EMAIL PROTECTED]
> weibo: @haitao_yao
> Skype:  haitao.yao.final
>
> On 2012-9-17, at 下午1:01, Dmitriy Ryaboy wrote:
>
> Ok, then it's not POSplit that's holding the memory -- it does not
> participate in any of the reduce stages, according the the plan you
> attached.
>
> To set parallelism, you can hardcode it on every operation that causes
> an MR boundary, with the exception of "group all"  and "limit" since
> those by definition require a single reducer. So, you can alter your
> script to explicitly request parallelism to be greater than what is
> estimated: "join .. parallel $P", "group by .. parallel $P", "order
> ... parallel $P", etc.
>
> I would recommend two things:
> 1) Make sure you are running the latest trunk, and have enabled
> in-memory aggregation ( set pig.exec.mapPartAgg true; set
> pig.exec.mapPartAgg.minReduction 3 ). I just made some significant
> improvements to Distinct's Initial phase (not requiring it to register
> with SpillableMemoryManager at all), and also improved in-mem
> aggregation performance.
>
> 2) It seems like you are doing a lot of "group, distinct the group,
> count" type operations. If you do have a distinct group that is very
> large, loading it all into ram is bound to cause problems. When the
> size of distinct sets is expected to be fairly high, we usually
> recommend a different pattern for count(distinct x):
>
> Instead of :
> results = foreach (group data by country) {
>  distinct_ids = distinct data.id;
>  generate group as country, COUNT(distinct_ids) as num_dist,
> COUNT(data) as total;
> }
>
> Do the following:
>
> results_per_id = foreach (group data by (country, id))
>  generate flatten(group) as (country, id), COUNT(data) as num_repeats;
> results = foreach (group results_per_id by country)
>  generate group as country, COUNT(results_per_id) as num_dist,
> SUM(results_per_id.num_repeats) as total;
>
> This will introduce an extra MR step, but it's much more scalable when
> you get into millions of distincts in a single dimension.
>
> D
>
> On Sun, Sep 16, 2012 at 2:18 AM, Haitao Yao <[EMAIL PROTECTED]> wrote:
>
> The map output of the first MR job is over 500MB, and only 1 reducer
> processes it. So OutOfMemoryError is caused.
+
Haitao Yao 2012-09-17, 09:26
+
Dmitriy Ryaboy 2012-09-16, 02:39
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB