Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig, mail # dev - Fwd: Push tuple/bag schema to UDF function automatically


Copy link to this message
-
Fwd: Push tuple/bag schema to UDF function automatically
Serega Sheypak 2013-07-25, 14:49
Hi, I have rather complex data processing routines and I'm  using many
Jython UDFS.

Here is my code example:

--project and get all visited route pivots
*routePivots* = FOREACH withNoIntersection GENERATE
withIntersection::msisdn    as msisdn: long,
                                            --0
                                                    withIntersection::ts
     as ts: long,
                  --1
                                                    withIntersection::lac
    as lac,
                 --2
                                                    withIntersection::cid
    as cid,
                 --3

withIntersection::cell_type as cell_type,
                                         --4

withIntersection::branch_id as branch_id,
                                         --5

(withIntersection::center_lon is null ? tiledGsm::center_lon :
withIntersection::center_lon) as center_lon: double,  --6

(withIntersection::center_lat is null ? tiledGsm::center_lat :
withIntersection::center_lat) as center_lat: double,  --7

(withIntersection::tile_id    is null ? tiledGsm::tile_id :
withIntersection::tile_id)     as tile_id:    int,     --8

(withIntersection::zone_col   is null ? tiledGsm::zone_col :
 withIntersection::zone_col)    as zone_col:   int,     --9

(withIntersection::zone_row   is null ? tiledGsm::zone_row :
 withIntersection::zone_row)    as zone_row:   int,     --10

withIntersection::is_active as is_active;

--filter non valid route pivots and store them
routePivotsGroupedByMsisdn = GROUP *routePivots* BY msisdn;
markedPivots = FOREACH routePivotsGroupedByMsisdn {
                *ordered* = ORDER *routePivots* BY ts;
                GENERATE FLATTEN(udf.*filter_route_pivots*(*ordered*, 55,
10000, 'ts:1, lon:6, lat:7'))
                        as  (msisdn:     long,      --0
                             ts:         long,      --1
                             lac:        int,       --2
                             cid:        int,       --3
                             cell_type:  chararray, --4
                             branch_id:  int,       --5
                             center_lon: double,    --6
                             center_lat: double,    --7
                             tile_id:    int,       --8
                             zone_col:   int,       --9
                             zone_row:   int,       --10
                             is_active:  boolean,   --11
                             avg_speed:  double,    --12
                             distance:   int,       --13
                             not_valid:  int);      --14
               }
You can see that relation *routePivots* has explicit schema.
I'm invoking UDF*filter_route_pivots* for the bag containing tuples with *
routePivots* schema.
And I'm passing partial schema description ('ts:1, lon:6, lat:7') to UDF.
I do it because I don't want to play with numbered tuple. It's a great tool
to do silly bugs. And its hard to catch them. I know.
I do access tuple values in 'named manner' inside UDF.

  ts_diff = n*ext_pivot[pivot_schema['ts']]* - *pivot[pivot_schema['ts']]*
    distance RouteCalculator.getDistanceInMeters(pivot[pivot_schema['lon']],
pivot[pivot_schema['lat']],

 next_pivot[pivot_schema['lon']],      next_pivot[pivot_schema['lat']])

You see that I don't play with magic numbers, my code is easier to read and
maintain and i'm not tightly coupled to schema and field order.

Does pig provide a tool to push down shema to UDF? I've declared it, why
can't I use it inside my udf?
I think such approach should be very useful.