Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> What is the best way to do counting in pig?


Copy link to this message
-
Re: What is the best way to do counting in pig?
I agree that more documentation around parameters would be good, and force
some consistency. I saw you made one ticket, can you make a ticket for that
as well?

Good find :)

2012/7/10 Haitao Yao <[EMAIL PROTECTED]>

> Oh, new discovery: we can not set pig.cachedbag.memusage = 0 because
> every time the InternalCachedBag spills, It creates a new tmp file in
> java.io.tmpdir. if we set pig.cachedbag.memusage to 0 , every new tuple
> added into InternalCachedBag will create a new tmp file. And the tmp file
> is deleted on exit.
> So , if you're unlucky like me, you will get a OOM Exception caused by
> java.io.DeleteOnExitHook!
> Here's the evidence:
>
> God, we really need a full description of how every parameter works.
>
>
>
> Haitao Yao
> [EMAIL PROTECTED]
> weibo: @haitao_yao
> Skype:  haitao.yao.final
>
> 在 2012-7-10,下午4:20, Haitao Yao 写道:
>
> I found the solution.
>
> After analyzing the heap dump while the reducer OOM, I found out the
> memory is consumed by org.apache.pig.data.InternalCachedBag , here's the
> diagram:
> <cc.jpg>
>
> In the source code of org.apache.pig.data.InternalCachedBag, I found out
> there's a parameter for the cache limit:
>  *public* InternalCachedBag(*int* bagCount) {
>         *float* percent = 0.2F;
>
>     *if* (PigMapReduce.*sJobConfInternal*.get() != *null*) {
> // here, the cache limit is from here!
>     String usage = PigMapReduce.*sJobConfInternal*.get().get(
> "pig.cachedbag.memusage");
>     *if* (usage != *null*) {
>     percent = Float.*parseFloat*(usage);
>     }
>     }
>
>         init(bagCount, percent);
>     }
>     *private* *void* init(*int* bagCount, *float* percent) {
>     factory = TupleFactory.*getInstance*();
>     mContents = *new* ArrayList<Tuple>();
>
>     *long* max = Runtime.*getRuntime*().maxMemory();
>         maxMemUsage = (*long*)(((*float*)max * percent) / (*float*
> )bagCount);
>         cacheLimit = Integer.*MAX_VALUE*;
>
>         // set limit to 0, if memusage is 0 or really really small.
>         // then all tuples are put into disk
>         *if* (maxMemUsage < 1) {
>         cacheLimit = 0;
>         }
>         *log*.warn("cacheLimit: " + *this*.cacheLimit);
>         addDone = *false*;
>     }
>
> so, after write pig.cachedbag.memusage=0 into
> $PIG_HOME/conf/pig.properties, my job successes!
>
> You can also set to an appropriate value to fully utilize your memory as a
> cache.
>
> Hope this is useful for others.
> Thanks.
>
>
> Haitao Yao
> [EMAIL PROTECTED]
> weibo: @haitao_yao
> Skype:  haitao.yao.final
>
> 在 2012-7-10,下午1:06, Haitao Yao 写道:
>
> my reducers get 512 MB, -Xms512M -Xmx512M.
> The reducer does not get OOM when manually invoke spill in my case.
>
> Can you explain more about your solution?
> And can your solution fit into 512MB reducer process?
> Thanks very much.
>
>
>
> Haitao Yao
> [EMAIL PROTECTED]
> weibo: @haitao_yao
> Skype:  haitao.yao.final
>
> 在 2012-7-10,下午12:26, Jonathan Coveney 写道:
>
> I have something in the mix that should reduce bag memory :) Question: how
> much memory are your reducers getting? In my experience, you'll get OOM's
> on spilling if you have allocated less than a gig to the JVM
>
> 2012/7/9 Haitao Yao <[EMAIL PROTECTED]>
>
> I have encountered the similar problem.  And I got a OOM while running the
>
> reducer.
>
> I think the reason is the data bag generated after group all is too big to
>
> fit into the reducer's memory.
>
>
> and I have written a new COUNT implementation with explicit invoke
>
> System.gc() and spill  after the COUNT function finish its job, but it
>
> still get OOM
>
>
> here's the code of the new COUNT implementation:
>
>        @Override
>
>        public Long exec(Tuple input) throws IOException {
>
>                DataBag bag = (DataBag)input.get(0);
>
>                Long result = super.exec(input);
>
>                LOG.warn(" before spill data bag memory : " +
>
> Runtime.getRuntime().freeMemory());
>
>                bag.spill();
>
>                System.gc();
>
>                LOG.warn(" after spill data bag memory : " +