Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> Multithreaded UDF


Copy link to this message
-
Re: Multithreaded UDF
Assuming 1-5 seconds is mainly waiting for IO, using multiple reducers or
mapper might not be suitable since it just takes too many mapper an d
reducer slots. Couple of options:

1. use streaming : you have full control on how many you handle at a time.
Might be tricky to pass url content.

2. a hack: say you want handle 1000 urls at a time, write a simple loader
that extends PigStorage(), where getNext() looks something like :
 { DataBag bag = ...;
      for(int i=; i<1000; i++) {
         tuple = super.getNext();
         if (tuple == null) break;
         bag.add(tuple);
       }
       return bag.size() > 0 ? bag : null;
   }
and your UDF handles bag of tuples and returns a bag of tuples.

Raghu.
On Wed, Nov 9, 2011 at 9:12 AM, Jonathan Coveney <[EMAIL PROTECTED]> wrote:

> I don't get how this would be a win. Let's imagine you have a system that
> you're fully saturating with map tasks, such that you have, say, 50
> available cpus (after task tracker, job tracker, etc) and you send your job
> to 50 mappers...how is this different from 25 mappers with 2 threads
> apiece? I guess it depends on whether or not the 1 to 5 seconds that each
> task is spending blocking on some action. I guess you could enqueue all the
> URL fetches, and then have another thread process that. Either way, the
> semantics for such a UDF would be awkward and run counter to the typical
> m/r use case, imho. However, if you wanted to do something like this (and
> assuming that you want to avoid waiting a bunch for some blocking i/o),
> what you could do would be to make an accumulator UDF, but then do a group
> all. So you do:
>
> customers       = LOAD 'hdfs://node1.c.foundation.local/data/customers.csv'
>                   USING PigStorage(',')
>                   AS (id:chararray, name:chararray, url:chararray);
>
> fetchResults    = FOREACH customers
>                   GENERATE id, name, url, fetchHttp(url);
>
> fetchResults = foreach (group customers all) generate customers.id,
> customers.name, fetchHttp(customers.url);
>
> this would cause the accumulator to be invoked, and you could just enqueue
> the elements of the input bag that you get and fire up a thread that begins
> fetching, and then once it is empty, begin processing the results of the
> fetch.
>
> note: that's pure theory and I don't know if it would actually be
> performant, but you could do it :)
>
> if you're not waiting on a bunch of IO, though, I don't see the gain. If
> you have 1-5s of actual work to do per url (not just waiting on the results
> of some long operations), then making it asynchronous won't change that.
>
> 2011/11/9 Daan Gerits <[EMAIL PROTECTED]>
>
> > I expect you are talking about the 1-5 second delay I talked about. What
> I
> > actually meant was that the code within the exec function of the UDF is
> > taking 1 to 5 seconds for each invocation. That's something I cannot
> change
> > since the fetch method is actually doing a lot more than only fetching
> > something. I cannot push the additional logic the fetching is invoking
> > higher since that would break the algorithm.
> >
> >
> > On 09 Nov 2011, at 16:05, Marek Miglinski wrote:
> >
> > > Something is wrong with your calculations UDF, think of something,
> > because I had experience when I needed to calculate efficiency of data
> > sent/downloaded by user, the logic there was too complex and despite that
> > the speed was ~ 0.02s per user which had ~ 500 transactions each, so in
> > overall ~ 0.00004s per tx.
> > >
> > > Example of the code:
> > > userGroup = GROUP recordTx BY user PARALLEL 100;
> > > userFlattened = FOREACH userGroup {
> > >       generated = Merge(recordTx);
> > >       GENERATE FLATTEN(generated);
> > > }
> > >
> > >
> > > Sincerely,
> > > Marek M.
> > > ________________________________________
> > > From: Daan Gerits [[EMAIL PROTECTED]]
> > > Sent: Wednesday, November 09, 2011 4:19 PM
> > > To: [EMAIL PROTECTED]
> > > Subject: Re: Multithreaded UDF
> > >
> > > Hi Marek,
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB