Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> Iterating over data set

Copy link to this message
Re: Iterating over data set
Hi Xuri,

This illustrates the use case for a UDF I've had to implement in one form or another called 'FilterBag'. It's essentially just Pig's builtin "FILTER" but would work like so (using your pseudocode):
A = load 'input' as (timestamp, worker, output);

-- Assuming you want to restrict each calculation to a day. ToDay is most likely going to be Piggybank's
-- ISOToDay truncation udf
with_day = foreach A generate timestamp, ToDay(timestamp) as day, worker, output;

-- First you'll have to get all output for a given worker on a given day into single bag
worker_output = foreach (group with_day by (worker, day)) {
                          -- this relation (worker_output) will have one tuple per unique worker, day, and timestamp
                          timestamps = distinct with_day.timestamp;
                            flatten(group)                        as (worker, day),
                            flatten(timestamps)               as t1,
                            with_day.(timestamp, output) as outputs; -- A bag that contains all of this workers output and their timestamps for this day

-- Next, filter each "outputs" bag to contain only outputs that occurred within a 10 minute (or whatever time unit of interest) window from the
-- timestamp, looking forward (whether you look forward, back, or both is up to you)
windowed = foreach worker_output {
                    -- FilterBag(bag, field_num, comparison_string, to_compare)
                    -- bag: bag to filter
                    -- field_num: 0 indexed field num of the tuples in the bag to use for comparison to "to_compare"
                    -- comparison_string: one of 'lt', 'lte', 'e', 'gte', 'gt' corresponding to less than, less than or equal to and so on
                    -- to_compare: the object to compare to

                    outputs_after         = FilterBag(outputs, 0, 'gte', t1);
                    outputs_windowed = FilterBag(outputs_after, 0, 'lt', t1+$TIME_UNIT);
                    -- what we WANT to do is this:
                    -- outputs_windowed = filter outputs by timestamp >= t1 and timestamp < t1+$TIME_UNIT;
                    -- but, I have never been able to make pig happy with this, thus FilterBag.
                     worker, day, t1, SUM(outputs_windowed.output) as summed_output, COUNT(outputs_windowed) as count;

dump windowed;

Notice that you'll have one record for each worker and timestamp that was actually measured. You'll have to do something more fancy if you want smoothing (eg. a record for timestamps where no data was recorded).

Importantly, it would be fantastic to be able to do this without a udf and just using Pig's filter command as shown in the comments above. However, I've tried this in several different ways and never gotten Pig to be happy with it. Instead, I've written a udf called "FilterBag" to accomplish this. Perhaps another Pig user can illuminate the situation better?

I'll see about publishing a simple version of FilterBag if it seems the pig community would use it.

On Jul 28, 2013, at 8:34 PM, Xuri Nagarin wrote:

> Hi,
> Lets say I have a data set of units of output per worker per second that's
> in chronological order for a whole day
> Example:
> 2013-07-26T14:00:00, Joe, 50
> 2013-07-26T14:10:00, Jane,60
> 2013-07-26T14:15:00, Joe, 55
> 2013-07-26T14:20:00, Jane,60
> I create the data set above by loading a larger data set and getting these
> three attributes in a relation.
> Now, I want to count output per user per unit of time period, say every ten
> minutes but as a rolling count with a window that moves by the minute. The
> pseudo-code would be something along the lines of:
> -----------xxxxxxxxxxxxxxxxx------------