Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> Pig Unique Counts on Multiple Subsets of a Large Input

Copy link to this message
Re: Pig Unique Counts on Multiple Subsets of a Large Input
In the script you gave I'd be surprised if it's spending time in the map phase, as the map should be very simple.  It's the reduce phase I'd expect to be very expensive because your mapping UDF prevents Pig from using the algebraic nature of count (that is, it has to ship all of the records to reduce not just the number of records).  If your file is large this will be expensive.  What happens if you switch your script to:

A = load ...
B = foreach A generate dimA, dimB, udf.newUserIdForCategory1(userId, activity) as userId1, ...
C = group B by dimA, dimB
D = foreach C generate flatten(group), COUNT(userId1), ...

When you said it was taking a long time in the map phase were you trying something like the above?  If so I'd check how long your UDF is taking.  Unless you're reading tons of data on a very small cluster the above should be very fast.  It definitely should not reread the input for each UDF.

Other things to check:
What's your parallel count set at?  That is, how many reducers are you running?
How many waves of maps does this create?  That is, what's the number of maps this produces divided by the number of slots you get on your cluster to run it?


On May 5, 2013, at 8:11 PM, Thomas Edison wrote:

> Hi there,
> I have a huge input on an HDFS and I would like to use Pig to calculate
> several unique metrics. To help explain the problem more easily, I assume
> the input file has the following schema:
> userId:chararray, dimensionA_key:chararray, dimensionB_key:chararray,
> dimensionC_key:chararray, activity:chararray, ...
> Each record represent an activity performed by that userId.
> Based on the value in the activity field, this activity record will be
> mapped to 1 or more categories. There are about 10 categories in total.
> Now I need to count the number of unique users for different dimension
> combinations (i.e. A, B, C, A+B, A+C, B+C, A+B+C) for each activity
> category.
> What would be the best practices to perform such calculation?
> I have tried several ways. Although I can get the results I want, it takes
> a very long time (i.e. days). What I found is most of the time is spent on
> the map phase. It looks like the script tries to load the huge input file
> every time it tries to calculate one unique count. Is there a way to
> improve this behavior?
> I also tried something similar to below, but it looks like it reaches the
> memory cap for a single reducer and just stuck at the last reducer step.
> source = load ... as (userId:chararray, dimensionA_key:chararray,
> dimensionB_key:chararray, dimensionC_key:chararray,
> activity:chararray, ...);
> a = group source by (dimensionA_key, dimensionB_key);
> b = foreach a {
>    userId1 = udf.newUserIdForCategory1(userId, activity);
>    -- this udf returns the original user id if the activity should be
> mapped to Category1 and None otherwise
>    userId2 = udf.newUserIdForCategory2(userId, activity);
>    userId3 = udf.newUserIdForCategory3(userId, activity);
>    ...
>    userId10 = udf.newUserIdForCategory10(userId, activity);
>    generate FLATTEN(group), COUNT(userId1), COUNT(userId2),
> COUNT(userId3), ..., COUNT(userId10);
> }
> store b ...;
> Thanks.
> T.E.