Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> Pig Unique Counts on Multiple Subsets of a Large Input


Copy link to this message
-
Re: Pig Unique Counts on Multiple Subsets of a Large Input
Hi Thomas

It looks like you are trying to count unique users for multiple combinations of dimensions. If so, then you can make use of CUBE operator. Since counting unique users is not an algebraic measure (it is holistic measure) it may result in very slow reducer. There is JIRA which addresses this issue https://issues.apache.org/jira/browse/PIG-2831. If this is what you are looking for then I will look into this JIRA as soon as possible.

Thanks
-- Prasanth

On May 6, 2013, at 10:03 AM, Thomas Edison <[EMAIL PROTECTED]> wrote:

> @Alan
> I just tried your method as shown below.  The script is stuck at the last
> reducer even for a relative small set of the data and less combinations.  I
> suspect it's an out of memory issue.  If I remember correctly, to use
> nested foreach to calculate the unique counts is not a very good idea.  Any
> suggestions?  Thanks.
>
> T.E.
>
> source = load ...;
> A = foreach each source generate dimA, dimB, userId,
>    udf.getActivity1UserId(userId, activity) as activity1_userId,
>    udf.getActivity2UserId(userId, activity) as activity2_userId,
>    udf.getActivity3UserId(userId, activity) as activity3_userId,
>    ...
>    udf.getActivity10UserId(userId, activity) as activity10_userId;
>
> B = group A by (dimA, dimB);
>
> C = foreach B {
>    unique_activity1 = distinct A.activity1_userId;
>    unique_activity2 = distinct A.activity2_userId;
>    unique_activity3 = distinct A.activity3_userId;
>    ...
>    unique_activity10 = distinct A.activity10_userId;
>    generate FLATTEN(group), COUNT(unique_activity1),
> COUNT(unique_activity2), COUNT(unique_activity3), ...,
> COUNT(unique_activity10);
> }
>
> STORE C...;
>
>
> On Mon, May 6, 2013 at 8:41 AM, Thomas Edison <
> [EMAIL PROTECTED]> wrote:
>
>> Thanks for the reply.
>>
>> @Jonathan,
>> I haven't worked with CUBE before.  I will try to learn it.  Thanks for
>> the tip.
>> Currently, to split the activity, I use something like this.
>>
>> new_relation = FILTER relation BY activity == 'abc' or activity == 'def';
>>
>> In some cases, it is a one to one mapping, but not always.  To my
>> understanding, the SPLIT keyword is doing exactly the same as the way I'm
>> doing, correct?
>>
>> @Alan,
>> I haven't tried your method.  I didn't come up with the UDF way until I
>> saw my old script is taking too much time in the map phase - scanning the
>> source multiple times.  I will try your method.  I also attached my old
>> code at the end, just in case.
>>
>> I set my reducer at about 90% of my reducer cap.  I think this is what is
>> recommended.
>>
>> It takes about 10-15 waves.
>>
>> My old script:
>> source = load ...;
>>
>> activity_1 = FILTER source BY activity = 'abc' OR activity = 'def';
>> A_1 = foreach activity_1 generate dimA, dimB, userId;
>> B_1 = distinct A_1;
>> C_1 = group B_1 by (dimA, dimB);
>> D_1 = foreach C_1 generate FLATTEN(group), COUNT(C_1);
>> STORE...
>>
>> -- repeat for activity_1, but for other dimension combinations;
>>
>>
>> activity_2 = FILTER source BY activity = 'abc';
>> -- repeat whatever activity_1 has been done
>>
>> -- repeat other activities.
>>
>> Thanks.
>>
>> T.E.
>>
>>
>> On Mon, May 6, 2013 at 8:12 AM, Alan Gates <[EMAIL PROTECTED]> wrote:
>>
>>> In the script you gave I'd be surprised if it's spending time in the map
>>> phase, as the map should be very simple.  It's the reduce phase I'd expect
>>> to be very expensive because your mapping UDF prevents Pig from using the
>>> algebraic nature of count (that is, it has to ship all of the records to
>>> reduce not just the number of records).  If your file is large this will be
>>> expensive.  What happens if you switch your script to:
>>>
>>> A = load ...
>>> B = foreach A generate dimA, dimB, udf.newUserIdForCategory1(userId,
>>> activity) as userId1, ...
>>> C = group B by dimA, dimB
>>> D = foreach C generate flatten(group), COUNT(userId1), ...
>>>
>>> When you said it was taking a long time in the map phase were you trying
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB