Pig >> mail # user >> Fwd: PigServer memory leak

Fwd: PigServer memory leak
Posting for Bill.
---------- Forwarded message ----------
From: Bill Graham <[EMAIL PROTECTED]>
Date: Wed, Mar 10, 2010 at 10:11
Subject: Re: PigServer memory leak
Thanks for the reply, Ashutosh.

[hadoop.apache.org keeps flagging my reply as spam, so I'm replying
directly to you. Feel free to push this conversation back onto the
list, if you can. :)]

I'm running the same two scripts, one after the other, every 5
minutes. The scripts have dynamic tokens substituted to change the
input and output directories. Besides that, they have the same logic.

I will try to execute the script from grunt next time it happens, but
I don't see how a lack of pig MR optimizations could cause a memory
issue on the client? If I bounce my daemon, the next jobs to run
executes without a problem upon start, so I would also expect a script
run through grunt at that time to run without a problem as well.

I reverted back to re-initializing PigServer for every run. I have
other places in my scheduled workflow where I interact with HDFS which
I've now modified to re-use an instance of Hadoop's Configuration
object for the life of the VM. I was re-initializing that many times
per run. Looking at the Configuration code it seems to re-parse the
XML configs into a DOM every time it's called, so this certainly looks
like a place for a potential leak. If nothing else it should give me
an optimization. Configuration seems to be stateless and read-only
after initiation so this seems safe.

Anyway, here are my two scripts. The first generates summaries, the
second makes a report from the summaries and they run in separate
PigServer instances via registerQuery(..). Let me know if you see
anything that seems off:
define chukwaLoader org.apache.hadoop.chukwa.
define tokenize     com.foo.hadoop.mapreduce.pig.udf.TOKENIZE();
define regexMatch   com.foo.hadoop.mapreduce.pig.udf.REGEX_MATCH();
define timePeriod   org.apache.hadoop.chukwa.TimePartition('@TIME_PERIOD@');

    USING chukwaLoader AS (ts: long, fields);
bodies = FOREACH raw GENERATE tokenize((chararray)fields#'body') as
tokens, timePeriod(ts) as time;

-- pull values out of the URL
tokens1 = FOREACH bodies GENERATE
      (int)regexMatch($0.token4, '(?:[?&])ptId=([^&]*)', 1) as pageTypeId,
      (int)regexMatch($0.token4, '(?:[?&])sId=([^&]*)', 1) as siteId,
      (int)regexMatch($0.token4, '(?:[?&])aId=([^&]*)', 1) as assetId, time,
      regexMatch($0.token4, '(?:[?&])tag=([^&]*)', 1) as tagValue;

-- filter out entries without an assetId
tokens2 = FILTER tokens1 BY
    (assetId is not null) AND (pageTypeId is not null) AND (siteId is not null);

-- group by tagValue, time, assetId and flatten to get counts
grouped = GROUP tokens2 BY (tagValue, time, assetId, pageTypeId, siteId);
flattened = FOREACH grouped GENERATE
    FLATTEN(group) as (tagValue, time, assetId, pageTypeId, siteId),
COUNT(tokens2) as count;

shifted = FOREACH flattened GENERATE time, count, assetId, pageTypeId,
siteId, tagValue;

-- order and store
ordered = ORDER shifted BY tagValue ASC, count DESC, assetId DESC,
pageTypeId ASC, siteId ASC, time DESC;

        (ts: long, count: int, assetId: int, pageTypeId: chararray,
siteId: int, tagValue: chararray);

-- now store most popular overall - filtered by pageTypeId
most_popular_filtered = FILTER raw BY
    (siteId == 162) AND (pageTypeId matches
most_popular = GROUP most_popular_filtered BY (ts, assetId, pageTypeId);
most_popular_flattened = FOREACH most_popular GENERATE
    FLATTEN(group) as (ts, assetId, pageTypeId),
SUM(most_popular_filtered.count) as count;
most_popular_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId;

most_popular_ordered = ORDER most_popular_shifted
    BY ts DESC, count DESC, assetId ASC, pageTypeId ASC;
STORE most_popular_ordered INTO

'@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype_temp' USING
      '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype/', '3', 'none', '\t');

most_popular_by_tag_filtered = FILTER raw BY
    (siteId == 162) AND (tagValue is not null);
most_popular_by_tag = GROUP most_popular_by_tag_filtered BY (ts,
assetId, pageTypeId, tagValue);
most_popular_flattened = FOREACH most_popular_by_tag
    GENERATE FLATTEN(group) as (ts, assetId, pageTypeId, tagValue),
SUM(most_popular_by_tag_filtered.count) as count;
most_popular_by_tag_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId, tagValue;
most_popular_by_tag_ordered = ORDER most_popular_by_tag_shifted
    BY ts DESC, tagValue ASC, count DESC, assetId ASC, pageTypeId ASC;
STORE most_popular_by_tag_ordered INTO
'@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag_temp' USING
      '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag/', '4', 'none', '\t');

On Tue, Mar 9, 2010 at 9:13 PM, Ashutosh Chauhan