Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> Fwd: PigServer memory leak


Copy link to this message
-
Fwd: PigServer memory leak
Posting for Bill.
---------- Forwarded message ----------
From: Bill Graham <[EMAIL PROTECTED]>
Date: Wed, Mar 10, 2010 at 10:11
Subject: Re: PigServer memory leak
To: [EMAIL PROTECTED]
Thanks for the reply, Ashutosh.

[hadoop.apache.org keeps flagging my reply as spam, so I'm replying
directly to you. Feel free to push this conversation back onto the
list, if you can. :)]

I'm running the same two scripts, one after the other, every 5
minutes. The scripts have dynamic tokens substituted to change the
input and output directories. Besides that, they have the same logic.

I will try to execute the script from grunt next time it happens, but
I don't see how a lack of pig MR optimizations could cause a memory
issue on the client? If I bounce my daemon, the next jobs to run
executes without a problem upon start, so I would also expect a script
run through grunt at that time to run without a problem as well.

I reverted back to re-initializing PigServer for every run. I have
other places in my scheduled workflow where I interact with HDFS which
I've now modified to re-use an instance of Hadoop's Configuration
object for the life of the VM. I was re-initializing that many times
per run. Looking at the Configuration code it seems to re-parse the
XML configs into a DOM every time it's called, so this certainly looks
like a place for a potential leak. If nothing else it should give me
an optimization. Configuration seems to be stateless and read-only
after initiation so this seems safe.

Anyway, here are my two scripts. The first generates summaries, the
second makes a report from the summaries and they run in separate
PigServer instances via registerQuery(..). Let me know if you see
anything that seems off:
define chukwaLoader org.apache.hadoop.chukwa.
ChukwaStorage();
define tokenize     com.foo.hadoop.mapreduce.pig.udf.TOKENIZE();
define regexMatch   com.foo.hadoop.mapreduce.pig.udf.REGEX_MATCH();
define timePeriod   org.apache.hadoop.chukwa.TimePartition('@TIME_PERIOD@');

raw = LOAD '@HADOOP_INPUT_LOCATION@'
    USING chukwaLoader AS (ts: long, fields);
bodies = FOREACH raw GENERATE tokenize((chararray)fields#'body') as
tokens, timePeriod(ts) as time;

-- pull values out of the URL
tokens1 = FOREACH bodies GENERATE
      (int)regexMatch($0.token4, '(?:[?&])ptId=([^&]*)', 1) as pageTypeId,
      (int)regexMatch($0.token4, '(?:[?&])sId=([^&]*)', 1) as siteId,
      (int)regexMatch($0.token4, '(?:[?&])aId=([^&]*)', 1) as assetId, time,
      regexMatch($0.token4, '(?:[?&])tag=([^&]*)', 1) as tagValue;

-- filter out entries without an assetId
tokens2 = FILTER tokens1 BY
    (assetId is not null) AND (pageTypeId is not null) AND (siteId is not null);

-- group by tagValue, time, assetId and flatten to get counts
grouped = GROUP tokens2 BY (tagValue, time, assetId, pageTypeId, siteId);
flattened = FOREACH grouped GENERATE
    FLATTEN(group) as (tagValue, time, assetId, pageTypeId, siteId),
COUNT(tokens2) as count;

shifted = FOREACH flattened GENERATE time, count, assetId, pageTypeId,
siteId, tagValue;

-- order and store
ordered = ORDER shifted BY tagValue ASC, count DESC, assetId DESC,
pageTypeId ASC, siteId ASC, time DESC;
STORE ordered INTO '@HADOOP_OUTPUT_LOCATION@';

raw = LOAD '@HADOOP_INPUT_LOCATION@' USING PigStorage('\t') AS
        (ts: long, count: int, assetId: int, pageTypeId: chararray,
siteId: int, tagValue: chararray);

-- now store most popular overall - filtered by pageTypeId
most_popular_filtered = FILTER raw BY
    (siteId == 162) AND (pageTypeId matches
'(2100)|(1606)|(1801)|(2300)|(2718)');
most_popular = GROUP most_popular_filtered BY (ts, assetId, pageTypeId);
most_popular_flattened = FOREACH most_popular GENERATE
    FLATTEN(group) as (ts, assetId, pageTypeId),
SUM(most_popular_filtered.count) as count;
most_popular_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId;

most_popular_ordered = ORDER most_popular_shifted
    BY ts DESC, count DESC, assetId ASC, pageTypeId ASC;
STORE most_popular_ordered INTO
'@HADOOP_OUTPUT_LOCATION@/most_popular_overall/';

STORE most_popular_ordered INTO
'@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype_temp' USING
      com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
      '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype/', '3', 'none', '\t');

most_popular_by_tag_filtered = FILTER raw BY
    (siteId == 162) AND (tagValue is not null);
most_popular_by_tag = GROUP most_popular_by_tag_filtered BY (ts,
assetId, pageTypeId, tagValue);
most_popular_flattened = FOREACH most_popular_by_tag
    GENERATE FLATTEN(group) as (ts, assetId, pageTypeId, tagValue),
SUM(most_popular_by_tag_filtered.count) as count;
most_popular_by_tag_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId, tagValue;
most_popular_by_tag_ordered = ORDER most_popular_by_tag_shifted
    BY ts DESC, tagValue ASC, count DESC, assetId ASC, pageTypeId ASC;
STORE most_popular_by_tag_ordered INTO
'@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag_temp' USING
      com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
      '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag/', '4', 'none', '\t');

On Tue, Mar 9, 2010 at 9:13 PM, Ashutosh Chauhan
<[EMAIL PROTECTED]> wrote:
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB