Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> Problems with Join in pig


Copy link to this message
-
Problems with Join in pig
The following pig script runs fine without the 2GB memory setting (see in yellow). But fails with memory setting. I am not sure what's happening. It's a simple operation of joining one tuple(of 1 row) with the other tuple.
Here is what I am trying to do:

 1.  grouping all SELECT HIT TIME DATA into a single tuple by doing a GROUP ALL.
 2.  getting the min and max of that set and putting it into MIN HIT DATA. This is a tuple with a single row.
 3.  then grouping SELECT MAX VISIT TIME DATA by visid,
 4.  then generating  DUMMY_KEY  for every row, along with MAX of start time.
 5.  then try to join the single tuple in 2 with all tuples generated in 4 to get a min time and a max time

Code:
Shell prompt:
## setting heap size to 2 GB
PIG_OPTS="$PIG_OPTS -Dmapred.child.java.opts=-Xmx2048m"
export PIG_OPTS

Pig/Grunt

RAW_DATA = LOAD '/omniture_test_qa/cleansed_output_1/2011/01/05/wdgesp360/wdgesp360_2011-01-05*.tsv.gz' USING PigStorage('\t');
FILTER_EXCLUDES_DATA = FILTER RAW_DATA BY $6 <= 0;
SELECT_CAST_DATA = FOREACH FILTER_EXCLUDES_DATA GENERATE 'DUMMYKEY' AS DUMMY_KEY,(int)$0 AS hit_time_gmt, (long)$2 AS visid_high, (long)$3 AS visid_low, (chararray)$5 AS truncated_hit;
SELECT_DATA = FILTER SELECT_CAST_DATA BY truncated_hit =='N';
 --MIN AND MAX_HIT_TIME_GMT FOR THE FILE/SUITE
SELECT_HIT_TIME_DATA = FOREACH SELECT_DATA GENERATE (int)hit_time_gmt;
GROUPED_ALL_DATA = GROUP SELECT_HIT_TIME_DATA ALL PARALLEL 100;
MIN_HIT_DATA = FOREACH GROUPED_ALL_DATA  GENERATE 'DUMMYKEY'AS DUMMY_KEY,MIN(SELECT_HIT_TIME_DATA.hit_time_gmt) AS MIN_HIT_TIME_GMT,MAX(SELECT_HIT_TIME_DATA.hit_time_gmt) AS MAX_HIT_TIME_GMT;
 ---MAX_VISIT_START_TIME BY VISITOR_ID
SELECT_MAX_VISIT_TIME_DATA =  FOREACH SELECT_DATA GENERATE visid_high,visid_low,visit_start_time_gmt;
GROUP_BY_VISID_MAX_VISIT_TIME_DATA = GROUP SELECT_MAX_VISIT_TIME_DATA  BY (visid_high,visid_low) PARALLEL 100;
MAX_VISIT_TIME = FOREACH GROUP_BY_VISID_MAX_VISIT_TIME_DATA GENERATE  'DUMMYKEY' AS DUMMY_KEY,FLATTEN(group.visid_high) AS visid_high,FLATTEN(group.visid_low) AS visid_low, MAX(SELECT_MAX_VISIT_TIME_DATA.visit_start_time_gmt) AS MAX_VISIT_START_TIME;
JOINED_MAX_VISIT_TIME_DATA = COGROUP MAX_VISIT_TIME BY DUMMY_KEY OUTER,MIN_HIT_DATA BY DUMMY_KEY OUTER PARALLEL 100;
MIN_MAX_VISIT_HIT_TIME = FOREACH JOINED_MAX_VISIT_TIME_DATA GENERATE FLATTEN(MAX_VISIT_TIME.visid_high),FLATTEN(MAX_VISIT_TIME.visid_low),FLATTEN(MAX_VISIT_TIME.MAX_VISIT_START_TIME),FLATTEN(MIN_HIT_DATA.MIN_HIT_TIME_GMT),FLATTEN(MIN_HIT_DATA.MAX_HIT_TIME_GMT);
 DUMP MIN_MAX_VISIT_HIT_TIME;
Can any one please guide me through this problem?
Thanks
Sri