Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> creating a graph over time

Copy link to this message
Re: creating a graph over time
Hey Jon,

Your windowing udf will be very useful outside of this particular usecase.
It will be great if you can contribute it to PiggyBank.


On Tue, Nov 1, 2011 at 10:44, Jonathan Coveney <[EMAIL PROTECTED]> wrote:

> Okie dokie. So first, let's clarify and simplify the problem a little,
> especially to ensure that I know what is going on.
> Let's first just focus on a particular class. This is ok since presumably
> each class is independent. Now, we have user_id, start_time, and end_time
> (start_time+duration). If I understand correctly, a user_id should be
> included up to end_time+30s, since this is a 30s moving window. As such,
> we'll just ignore that side of things for now, because you can just
> transform people's start times accordingly. Further, the assumption is that
> for a given user_id, you will not have overlapping start and end
> times...you can have multiple entries, ie "user 1, start 1, end 3; user 1,
> start 5, end 7;" but you can't have them in this form: "user 1, start 1,
> end 3; user 1, start 2, end 4."
> So we have simplified the question to this: given: user_id, start_time,
> and end_time (which never overlap), how can I get a count of unique users
> for every second? So now we will design a UDF to generate that output as a
> bag of (time, # of people) pairs, for every second from min(start_time) to
> max(end_time). The UDF will accept a bag sorted on the start time. Now, as
> I write it it's going to be a simple evalfunc, but it should be an
> accumulator. It's easy to make the transition.
> Here is what you do. Initialize a PriorityQueue. The natural ordering for
> int and long is fine, as it will ensure that when we poll it, we'll get the
> earliest end time, which is what we want.
> So step one is to pull the first tuple, and get the start_time and
> end_time. The start time will set our time to start_time (which is
> min(start_time) since it was sorted on start_time), and we add the end_time
> to the priority queue. We have a counter "uniques" which we increment.
> Now, before we actually do increment, we grab the next tuple. Why do you
> do this instead of go to the next end time? Because we don't know if
> someone starts in between now and the next end time. So we grab the tuple
> and get its start and end time. Now there are two cases.
> Case 1: the start time is less than the head of the priority queue, via a
> peek. If this is the case, then we can safely increment up to the
> start_time we just got, and then go from there. This is because it's
> impossible for there to be a new end_time less than the start_time we just
> got, because they are ordered by start_time and end_time>start_time. So we
> add the new end_time, and then we increment our timer until we get to the
> new start_time we just got, and add (timer,unique) at each step. When we
> get to start_time, we unique++. Now we get the next tuple and repeat.
> Case 2: the start time comes after the head of the priority queue, via a
> peek. If this is the case, then we need to increment up to the current
> head, emitting (timer,unique). Then when we get to the time_value equal to
> that end_time, we unique--, and check again if the start_time comes before
> than the head of the priority queue. Until it does, we repeat step 2. Once
> it does, we do step 1.
> I've attached a crude, untested UDF that does this. Buyer beware. But it
> shows the general flow, and should be better than exploding the data (I
> really hate exploding data like that unless it's absolutely necessary).
> To use, generate some data, then...
> register window.jar;
> define window com.jcoveney.Window('30');
> a = load 'data' using PigStorage(',') as (uid:long,start:long,end:long);
> b = foreach (group a all) {
>   ord = order a by start asc;
>   generate flatten(window(ord));
> }
> dump b;
> to generate data, I first did just  a small subsample just to think about
> it, then I did (in python)
> import random
> f=open("data","w")