Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Pig >> mail # user >> Push tuple/bag schema to UDF function automatically


Copy link to this message
-
Push tuple/bag schema to UDF function automatically
Hi, I have rather complex data processing routines and I'm  using many
Jython UDFS.

Here is my code example:

--project and get all visited route pivots
*routePivots* = FOREACH withNoIntersection GENERATE
withIntersection::msisdn    as msisdn: long,
                                            --0
                                                    withIntersection::ts
     as ts: long,
                  --1
                                                    withIntersection::lac
    as lac,
                 --2
                                                    withIntersection::cid
    as cid,
                 --3

withIntersection::cell_type as cell_type,
                                         --4

withIntersection::branch_id as branch_id,
                                         --5

(withIntersection::center_lon is null ? tiledGsm::center_lon :
withIntersection::center_lon) as center_lon: double,  --6

(withIntersection::center_lat is null ? tiledGsm::center_lat :
withIntersection::center_lat) as center_lat: double,  --7

(withIntersection::tile_id    is null ? tiledGsm::tile_id :
withIntersection::tile_id)     as tile_id:    int,     --8

(withIntersection::zone_col   is null ? tiledGsm::zone_col :
 withIntersection::zone_col)    as zone_col:   int,     --9

(withIntersection::zone_row   is null ? tiledGsm::zone_row :
 withIntersection::zone_row)    as zone_row:   int,     --10

withIntersection::is_active as is_active;

--filter non valid route pivots and store them
routePivotsGroupedByMsisdn = GROUP *routePivots* BY msisdn;
markedPivots = FOREACH routePivotsGroupedByMsisdn {
                *ordered* = ORDER *routePivots* BY ts;
                GENERATE FLATTEN(udf.*filter_route_pivots*(*ordered*, 55,
10000, 'ts:1, lon:6, lat:7'))
                        as  (msisdn:     long,      --0
                             ts:         long,      --1
                             lac:        int,       --2
                             cid:        int,       --3
                             cell_type:  chararray, --4
                             branch_id:  int,       --5
                             center_lon: double,    --6
                             center_lat: double,    --7
                             tile_id:    int,       --8
                             zone_col:   int,       --9
                             zone_row:   int,       --10
                             is_active:  boolean,   --11
                             avg_speed:  double,    --12
                             distance:   int,       --13
                             not_valid:  int);      --14
               }
You can see that relation *routePivots* has explicit schema.
I'm invoking UDF*filter_route_pivots* for the bag containing tuples with *
routePivots* schema.
And I'm passing partial schema description ('ts:1, lon:6, lat:7') to UDF.
I do it because I don't want to play with numbered tuple. It's a great tool
to do silly bugs. And its hard to catch them. I know.
I do access tuple values in 'named manner' inside UDF.

  ts_diff = n*ext_pivot[pivot_schema['ts']]* - *pivot[pivot_schema['ts']]*
    distance RouteCalculator.getDistanceInMeters(pivot[pivot_schema['lon']],
pivot[pivot_schema['lat']],

 next_pivot[pivot_schema['lon']],      next_pivot[pivot_schema['lat']])

You see that I don't play with magic numbers, my code is easier to read and
maintain and i'm not tightly coupled to schema and field order.

Does pig provide a tool to push down shema to UDF? I've declared it, why
can't I use it inside my udf?
I think such approach should be very useful.
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB