Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> Flattening nested bags


Copy link to this message
-
Re: Flattening nested bags
What version of pig are you using? The example I'm providing will only work
0.9+ since it's using nested foreach statements.

Getting from the flattened and joined stated to the grouped state you
achieved shouldn't take 5 inelegant steps. I'll give you the full working
example I tested with the 3 records in your example.

First things first. Let's load the data, flatten it and join it on the
second dataset:
A = load 'data1'
        using PigStorage()
        as (item: chararray, d: int, things: bag{(thing: chararray, d1:
int, values: bag{(v:chararray)})});
B = FOREACH A GENERATE item, d, FLATTEN(things);
C = FOREACH B GENERATE item, d, thing, d1, FLATTEN(values);
D = load 'data2'
        using PigStorage()
        as (value: chararray, result: chararray);
E = join C by things::values::v, D by value;
----------------------------E----------------------------------
(item1,111,thing1,222,value1,value1,result1)
(item1,111,thing1,222,value2,value2,result2)
----------------------------------------------------------------

At this point, you have data1 joined to data2. Now you want to group your
data back. You also apparently don't care about 'item' and 'd'. So you can
project those too.
F = FOREACH E GENERATE C::things::thing as thing,
                       C::things::d1 as d1,
                       D::value as value,
                       D::result as result;
G = GROUP F by (thing, d1);
----------------------------G----------------------------------
((thing1,222),{(thing1,222,value2,result2),(thing1,222,value1,result1)})
-----------------------------------------------------------------

Now, you have 'thing' and 'd1' duplicated. For each tuple in G, your inner
bag needs only 'value' and 'result'. You can do this by using a nested
foreach statement.
H = FOREACH G {
        I = FOREACH F GENERATE value, result;
        GENERATE group, I;
};
----------------------------H----------------------------------
((thing1,222),{(value2,result2),(value1,result1)})
----------------------------------------------------------------

The inner bag looks good, but you don't want 'd1' in the tuple with
'thing'. You want 'd1' in a tuple with your value-result-pair-bag. You can
use the TOTUPLE UDF to accomplish this.
J = FOREACH H GENERATE group.thing as thing, TOTUPLE(group.d1, I) as values;
----------------------------J----------------------------------
(thing1,(222,{(value2,result2),(value1,result1)}))
----------------------------------------------------------------

Almost there, now you want a bag around
(222,{(value2,result2),(value1,result1)}). First, you group by 'thing'.
K = GROUP J by thing;
----------------------------K----------------------------------
(thing1,{(thing1,(222,{(value2,result2),(value1,result1)}))})
----------------------------------------------------------------

Now you just need to get rid of the duplicate 'thing' in the bag. This is a
bit tricky, but can be done with a nested foreach and a flatten operator.
L = FOREACH K {
        M = FOREACH J GENERATE FLATTEN(values);
        GENERATE group as thing, M;
};
----------------------------L----------------------------------
(thing1,{(222,{(value2,result2),(value1,result1)})})
----------------------------------------------------------------

Voila! You have the result you are looking for.

Now the full script together:

A = LOAD 'data1'
        USING PigStorage()
        AS (item: chararray, d: int, things: bag{(thing: chararray, d1:
int, values: bag{(v:chararray)})});
B = FOREACH A GENERATE item, d, FLATTEN(things);
C = FOREACH B GENERATE item, d, thing, d1, FLATTEN(values);
D = LOAD 'data2'
        USING PigStorage()
        AS (value: chararray, result: chararray);
E = JOIN C BY things::values::v, D BY value;
F = FOREACH E GENERATE C::things::thing AS thing,
                       C::things::d1 AS d1,
                       D::value AS value,
                       D::result AS result;
G = GROUP F BY (thing, d1);
H = FOREACH G {
        I = FOREACH F GENERATE value, result;
        GENERATE group, I;
};
J = FOREACH H GENERATE group.thing AS thing, TOTUPLE(group.d1, I) AS values;
K = GROUP J BY thing;
L = FOREACH K {
        M = FOREACH J GENERATE FLATTEN(values);
        GENERATE group AS thing, M;
};

Obviously, you'll have to deal with the corner cases such as handling null
values and such, but this would be the bulk of the algorithm.

On a side note, to answer your original question of how to flatten nested
structures, you can either do two separate foreach statements with flatten
operations like I have in the above script or you can use a nested foreach
statement also.

The first way you have
A = LOAD 'data1'
        USING PigStorage()
        AS (item: chararray, d: int, things: bag{(thing: chararray, d1:
int, values: bag{(v:chararray)})});
B = FOREACH A GENERATE item, d, FLATTEN(things);
C = FOREACH B GENERATE item, d, thing, d1, FLATTEN(values);
This generates the following execution plan:
MR plan size after optimization: 1
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-23
Map Plan
C: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-22
|
|---C: New For Each(false,false,false,false,true)[bag] - scope-21
    |   |
    |   Project[chararray][0] - scope-11
    |   |
    |   Project[int][1] - scope-13
    |   |
    |   Project[chararray][2] - scope-15
    |   |
    |   Project[int][3] - scope-17
    |   |
    |   Project[bag][4] - scope-19
    |
    |---B: New For Each(false,false,true)[bag] - scope-10
        |   |
        |   Cast[chararray] - scope-2
        |   |
        |   |---Project[bytearray][0] - scope-1
        |   |
        |   Cast[int] - scope-5
        |   |
        |   |---Project[bytearray][1] - scope-4
        |   |
        |   Cast[bag:{(chararray,int,{(chararray)})}] - scope-8
        |   |
        |   |---Project[bytearray][2] - scope-7
        |
        |---A:
Load(file:///home/pradeepg26/projects/pig-ml/da