Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> What is the best way to do counting in pig?


Copy link to this message
-
Re: What is the best way to do counting in pig?
Haitao,
Is your query using combiner ? Can you send the explain plan output ?
Does the heap information say how many entries are there in the
InteralCachedBag's ArrayList ?
What version of pig are you using ?
Thanks,
Thejas
On 7/10/12 11:50 PM, Haitao Yao wrote:
> Oh, new discovery: we can not set pig.cachedbag.memusage = 0 because
> every time the InternalCachedBag spills, It creates a new tmp file in
> java.io.tmpdir. if we set pig.cachedbag.memusage to 0 , every new tuple
> added into InternalCachedBag will create a new tmp file. And the tmp
> file is deleted on exit.
> So , if you're unlucky like me, you will get a OOM Exception caused by
> java.io.DeleteOnExitHook!
> Here's the evidence:
>
> God, we really need a full description of how every parameter works.
>
>
>
> Haitao Yao
> [EMAIL PROTECTED] <mailto:[EMAIL PROTECTED]>
> weibo: @haitao_yao
> Skype:  haitao.yao.final
>
> 锟斤拷 2012-7-10锟斤拷锟斤拷锟斤拷4:20锟斤拷 Haitao Yao 写锟斤拷锟斤拷
>
>> I found the solution.
>>
>> After analyzing the heap dump while the reducer OOM, I found out the
>> memory is consumed by org.apache.pig.data.InternalCachedBag , here's
>> the diagram:
>> <cc.jpg>
>>
>> In the source code of org.apache.pig.data.InternalCachedBag, I found
>> out there's a parameter for the cache limit:
>> *public* InternalCachedBag(*int* bagCount) {
>> *float* percent = 0.2F;
>>
>> *if* (PigMapReduce./sJobConfInternal/.get() != *null*) {
>> // here, the cache limit is from here!
>> String usage =
>> PigMapReduce./sJobConfInternal/.get().get("pig.cachedbag.memusage");
>> *if* (usage != *null*) {
>> percent = Float./parseFloat/(usage);
>> }
>> }
>>
>>         init(bagCount, percent);
>>     }
>> *private* *void* init(*int* bagCount, *float* percent) {
>> factory = TupleFactory./getInstance/();
>> mContents = *new* ArrayList<Tuple>();
>>
>> *long* max = Runtime./getRuntime/().maxMemory();
>> maxMemUsage = (*long*)(((*float*)max * percent) / (*float*)bagCount);
>> cacheLimit = Integer./MAX_VALUE/;
>>
>> // set limit to 0, if memusage is 0 or really really small.
>> // then all tuples are put into disk
>> *if* (maxMemUsage < 1) {
>> cacheLimit = 0;
>>         }
>> /log/.warn("cacheLimit: " + *this*.cacheLimit);
>> addDone = *false*;
>>     }
>>
>> so, after write pig.cachedbag.memusage=0 into
>> $PIG_HOME/conf/pig.properties, my job successes!
>>
>> You can also set to an appropriate value to fully utilize your memory
>> as a cache.
>>
>> Hope this is useful for others.
>> Thanks.
>>
>>
>> Haitao Yao
>> [EMAIL PROTECTED] <mailto:[EMAIL PROTECTED]>
>> weibo: @haitao_yao
>> Skype:  haitao.yao.final
>>
>> 锟斤拷 2012-7-10锟斤拷锟斤拷锟斤拷1:06锟斤拷 Haitao Yao 写锟斤拷锟斤拷
>>
>>> my reducers get 512 MB, -Xms512M -Xmx512M.
>>> The reducer does not get OOM when manually invoke spill in my case.
>>>
>>> Can you explain more about your solution?
>>> And can your solution fit into 512MB reducer process?
>>> Thanks very much.
>>>
>>>
>>>
>>> Haitao Yao
>>> [EMAIL PROTECTED] <mailto:[EMAIL PROTECTED]>
>>> weibo: @haitao_yao
>>> Skype:  haitao.yao.final
>>>
>>> 锟斤拷 2012-7-10锟斤拷锟斤拷锟斤拷12:26锟斤拷 Jonathan Coveney 写锟斤拷锟斤拷
>>>
>>>> I have something in the mix that should reduce bag memory :)
>>>> Question: how
>>>> much memory are your reducers getting? In my experience, you'll get
>>>> OOM's
>>>> on spilling if you have allocated less than a gig to the JVM
>>>>
>>>> 2012/7/9 Haitao Yao <[EMAIL PROTECTED] <mailto:[EMAIL PROTECTED]>>
>>>>
>>>>> I have encountered the similar problem.  And I got a OOM while
>>>>> running the
>>>>> reducer.
>>>>> I think the reason is the data bag generated after group all is too
>>>>> big to
>>>>> fit into the reducer's memory.
>>>>>
>>>>> and I have written a new COUNT implementation with explicit invoke
>>>>> System.gc() and spill  after the COUNT function finish its job, but it
>>>>> still get OOM
>>>>>
>>>>> here's the code of the new COUNT implementation:
>>>>>        @Override
>>>>>        public Long exec(Tuple input) throws IOException {
>>>>>                DataBag bag = (DataBag)input.get(0);