That's cause you used "group all" which groups everything into one
group, which by definition can only go to one reducer.
What if instead you group into some large-enough number of buckets?
A = LOAD 'records.txt' USING PigStorage('\t') AS (recordId:int);
A_PRIME = FOREACH A generate *, ROUND(RANDOM() * 1000) as bucket;
B = GROUP A_PRIME by bucket PARALLEL $parallelism;
SPLITS = FOREACH B GENERATE Flatten(BagSplit(50,A_PRIME));
COMPLETE_RCORDS = FOREACH SPLITS GENERATE FLATTEN(MyCustomUDF($0));
On Mon, Sep 3, 2012 at 9:32 AM, James Newhaven <[EMAIL PROTECTED]> wrote:
> I'd appreciate if anyone has some ideas/pointers regarding a pig script and
> custom UDF I have written. I've found it runs too slowly on my hadoop
> cluster to be useful.......
> I have two million records inside a single 600MB file.
> For each record, I need to query a web service to retrieve additional data
> for this record.
> The web service supports batch requests of up to 50 records.
> I split the two million records into bags of 50 items (using the datafu
> BagSplit UDF) and then pass each bag on to a custom UDF I have written that
> processes each bag and queries the web service.
> I noticed when my script reaches my UDF, only one reducer is used and the
> job takes forever to complete (in fact it has never finished since I
> terminate it after a few hours).
> My script looks like this:
> A = LOAD 'records.txt' USING PigStorage('\t') AS (recordId:int);
> B = GROUP B ALL;
> SPLITS = FOREACH B GENERATE Flatten(BagSplit(50,A));
> COMPLETE_RCORDS = FOREACH SPLITS GENERATE FLATTEN(MyCustomUDF($0));