Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> What is the best way to do counting in pig?


Copy link to this message
-
Re: What is the best way to do counting in pig?
Haitao,
Is your query using combiner ? Can you send the explain plan output ?
Does the heap information say how many entries are there in the
InteralCachedBag's ArrayList ?
What version of pig are you using ?
Thanks,
Thejas
On 7/10/12 11:50 PM, Haitao Yao wrote:
> Oh, new discovery: we can not set pig.cachedbag.memusage = 0 because
> every time the InternalCachedBag spills, It creates a new tmp file in
> java.io.tmpdir. if we set pig.cachedbag.memusage to 0 , every new tuple
> added into InternalCachedBag will create a new tmp file. And the tmp
> file is deleted on exit.
> So , if you're unlucky like me, you will get a OOM Exception caused by
> java.io.DeleteOnExitHook!
> Here's the evidence:
>
> God, we really need a full description of how every parameter works.
>
>
>
> Haitao Yao
> [EMAIL PROTECTED] <mailto:[EMAIL PROTECTED]>
> weibo: @haitao_yao
> Skype:  haitao.yao.final
>
> 锟斤拷 2012-7-10锟斤拷锟斤拷锟斤拷4:20锟斤拷 Haitao Yao 写锟斤拷锟斤拷
>
>> I found the solution.
>>
>> After analyzing the heap dump while the reducer OOM, I found out the
>> memory is consumed by org.apache.pig.data.InternalCachedBag , here's
>> the diagram:
>> <cc.jpg>
>>
>> In the source code of org.apache.pig.data.InternalCachedBag, I found
>> out there's a parameter for the cache limit:
>> *public* InternalCachedBag(*int* bagCount) {
>> *float* percent = 0.2F;
>>
>> *if* (PigMapReduce./sJobConfInternal/.get() != *null*) {
>> // here, the cache limit is from here!
>> String usage =
>> PigMapReduce./sJobConfInternal/.get().get("pig.cachedbag.memusage");
>> *if* (usage != *null*) {
>> percent = Float./parseFloat/(usage);
>> }
>> }
>>
>>         init(bagCount, percent);
>>     }
>> *private* *void* init(*int* bagCount, *float* percent) {
>> factory = TupleFactory./getInstance/();
>> mContents = *new* ArrayList<Tuple>();
>>
>> *long* max = Runtime./getRuntime/().maxMemory();
>> maxMemUsage = (*long*)(((*float*)max * percent) / (*float*)bagCount);
>> cacheLimit = Integer./MAX_VALUE/;
>>
>> // set limit to 0, if memusage is 0 or really really small.
>> // then all tuples are put into disk
>> *if* (maxMemUsage < 1) {
>> cacheLimit = 0;
>>         }
>> /log/.warn("cacheLimit: " + *this*.cacheLimit);
>> addDone = *false*;
>>     }
>>
>> so, after write pig.cachedbag.memusage=0 into
>> $PIG_HOME/conf/pig.properties, my job successes!
>>
>> You can also set to an appropriate value to fully utilize your memory
>> as a cache.
>>
>> Hope this is useful for others.
>> Thanks.
>>
>>
>> Haitao Yao
>> [EMAIL PROTECTED] <mailto:[EMAIL PROTECTED]>
>> weibo: @haitao_yao
>> Skype:  haitao.yao.final
>>
>> 锟斤拷 2012-7-10锟斤拷锟斤拷锟斤拷1:06锟斤拷 Haitao Yao 写锟斤拷锟斤拷
>>
>>> my reducers get 512 MB, -Xms512M -Xmx512M.
>>> The reducer does not get OOM when manually invoke spill in my case.
>>>
>>> Can you explain more about your solution?
>>> And can your solution fit into 512MB reducer process?
>>> Thanks very much.
>>>
>>>
>>>
>>> Haitao Yao
>>> [EMAIL PROTECTED] <mailto:[EMAIL PROTECTED]>
>>> weibo: @haitao_yao
>>> Skype:  haitao.yao.final
>>>
>>> 锟斤拷 2012-7-10锟斤拷锟斤拷锟斤拷12:26锟斤拷 Jonathan Coveney 写锟斤拷锟斤拷
>>>
>>>> I have something in the mix that should reduce bag memory :)
>>>> Question: how
>>>> much memory are your reducers getting? In my experience, you'll get
>>>> OOM's
>>>> on spilling if you have allocated less than a gig to the JVM
>>>>
>>>> 2012/7/9 Haitao Yao <[EMAIL PROTECTED] <mailto:[EMAIL PROTECTED]>>
>>>>
>>>>> I have encountered the similar problem.  And I got a OOM while
>>>>> running the
>>>>> reducer.
>>>>> I think the reason is the data bag generated after group all is too
>>>>> big to
>>>>> fit into the reducer's memory.
>>>>>
>>>>> and I have written a new COUNT implementation with explicit invoke
>>>>> System.gc() and spill  after the COUNT function finish its job, but it
>>>>> still get OOM
>>>>>
>>>>> here's the code of the new COUNT implementation:
>>>>>        @Override
>>>>>        public Long exec(Tuple input) throws IOException {
>>>>>                DataBag bag = (DataBag)input.get(0);
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB