Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> creating a graph over time

Copy link to this message
Re: creating a graph over time
Hi Guys,

Sorry for joining this discussion so late.  I would suggest using
interval trees for dealing with overlapping time intervals.
There is a fairly nice treatment of interval trees in CLR, sect. 14.3.
 The data structure is essentially a red-black tree, and I surmise
that one
could extend java.util.TreeMap to implement it.


On Tue, Nov 1, 2011 at 1:44 PM, Jonathan Coveney <[EMAIL PROTECTED]> wrote:
> Okie dokie. So first, let's clarify and simplify the problem a little,
> especially to ensure that I know what is going on.
> Let's first just focus on a particular class. This is ok since presumably
> each class is independent. Now, we have user_id, start_time, and end_time
> (start_time+duration). If I understand correctly, a user_id should be
> included up to end_time+30s, since this is a 30s moving window. As such,
> we'll just ignore that side of things for now, because you can just
> transform people's start times accordingly. Further, the assumption is that
> for a given user_id, you will not have overlapping start and end times...you
> can have multiple entries, ie "user 1, start 1, end 3; user 1, start 5, end
> 7;" but you can't have them in this form: "user 1, start 1, end 3; user 1,
> start 2, end 4."
> So we have simplified the question to this: given: user_id, start_time, and
> end_time (which never overlap), how can I get a count of unique users for
> every second? So now we will design a UDF to generate that output as a bag
> of (time, # of people) pairs, for every second from min(start_time) to
> max(end_time). The UDF will accept a bag sorted on the start time. Now, as I
> write it it's going to be a simple evalfunc, but it should be an
> accumulator. It's easy to make the transition.
> Here is what you do. Initialize a PriorityQueue. The natural ordering for
> int and long is fine, as it will ensure that when we poll it, we'll get the
> earliest end time, which is what we want.
> So step one is to pull the first tuple, and get the start_time and end_time.
> The start time will set our time to start_time (which is min(start_time)
> since it was sorted on start_time), and we add the end_time to the priority
> queue. We have a counter "uniques" which we increment.
> Now, before we actually do increment, we grab the next tuple. Why do you do
> this instead of go to the next end time? Because we don't know if someone
> starts in between now and the next end time. So we grab the tuple and get
> its start and end time. Now there are two cases.
> Case 1: the start time is less than the head of the priority queue, via a
> peek. If this is the case, then we can safely increment up to the start_time
> we just got, and then go from there. This is because it's impossible for
> there to be a new end_time less than the start_time we just got, because
> they are ordered by start_time and end_time>start_time. So we add the new
> end_time, and then we increment our timer until we get to the new start_time
> we just got, and add (timer,unique) at each step. When we get to start_time,
> we unique++. Now we get the next tuple and repeat.
> Case 2: the start time comes after the head of the priority queue, via a
> peek. If this is the case, then we need to increment up to the current head,
> emitting (timer,unique). Then when we get to the time_value equal to that
> end_time, we unique--, and check again if the start_time comes before than
> the head of the priority queue. Until it does, we repeat step 2. Once it
> does, we do step 1.
> I've attached a crude, untested UDF that does this. Buyer beware. But it
> shows the general flow, and should be better than exploding the data (I
> really hate exploding data like that unless it's absolutely necessary).
> To use, generate some data, then...
> register window.jar;
> define window com.jcoveney.Window('30');
> a = load 'data' using PigStorage(',') as (uid:long,start:long,end:long);
> b = foreach (group a all) {
>   ord = order a by start asc;
>   generate flatten(window(ord));