|
|
-
Jython UDFs, Tuples and Stringconversions
Björn-Elmar Macek 2012-10-01, 14:42
Hi,
i am currently writing a PIG script that works with a bags of timestamp tuples. So i am basically working on a datastructure like this: (tuple(chararray)), int, bag{tuple(chararray)})
for example: ( ('a','b','c','d'), 3, {('2012-03-04 10:10:10'), ('2012-03-04 10:10:11')} )
When loading the data i add a schema, so pig knows what data is coming in: x = load 'tag_count_ts_pro_userpair' as (group:tuple(),cnt:int,times:bag{});
I then want to change the content of the times-bag, by replacing every timestamp with an integer, based on the time distance to a certain date, which i do with the follwing UDFs: ###### myUDF.py ############## from org.apache.pig.scripting import * import datetime import math @outputSchema("days_from_start:bag{t:tuple(cnt:int)}") def daysFromStart(startDate, aBagOfDates): if aBagOfDates is None: return None; result=[] for somedate in aBagOfDates: if somedate is None: continue aDateString = ''.join(somedate) #ALTERNATIVELY I USED ALSO: aDateString = ''.join(somedate[0]) // aDateString = ''.join(somedate[1]) if len(aDateString==16): result.append(diffTime(startDate, aDateString)) return result @outputSchema("diff:int") def diffTime(dateFrom,dateTil): dateSmall = datetime.datetime.strptime(dateFrom,"%Y-%m-%d %H:%M:%S"); dateBig = datetime.datetime.strptime(dateTil,"%Y-%m-%d %H:%M:%S"); delta = dateBig-dateSmall return delta.days
##########################
I do this by executing the following command in the grunt: y = foreach x generate *, moins.daysFromStart('2011-06-01 00:00:00', times);
But when i try to store y, i get the following error message:
######## LOG ############# 2012-10-01 16:35:03,499 [main] ERROR org.apache.pig.tools.pigstats.SimplePigStats - ERROR 2997: Unable to recreate exception from backed error: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Error executing function at org.apache.pig.scripting.jython.JythonFunction.exec(JythonFunction.java:106) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:216) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:258) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:316) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:332) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:284) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:271) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:266) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) at org.apache.hadoop.mapred.Child.main(Child.java:249) Caused by: Traceback (most recent call last): File "/gpfs/home02/fb16/bmacek/myUDF.py", line 38, in daysFromStart aDateString = ''.join(somedate[0]) TypeError: sequence item 0: expected string, int found
at org.python.core.Py.TypeError(Py.java:235) at org.python.core.PyString.str_join(PyString.java:1900) at org.python.core.PyString$str_join_exposer.__call__(Unknown Source) at org.python.core.PyObject.__call__(PyObject.java:391) at org.python.pycode._pyx3.daysFromStart$3(/gpfs/home02/fb16/bmacek/myUDF.py:40) at org.python.pycode._pyx3.call_function(/gpfs/home02/fb16/bmacek/myUDF.py) at org.python.core.PyTableCode.call(PyTableCode.java:165) at org.python.core.PyBaseCode.call(PyBaseCode.java:301) at org.python.core.PyFunction.function___call__(PyFunction.java:376) at org.python.core.PyFunction.__call__(PyFunction.java:371) at org.python.core.PyFunction.__call__(PyFunction.java:361) at org.python.core.PyFunction.__call__(PyFunction.java:356) at org.apache.pig.scripting.jython.JythonFunction.exec(JythonFunction.java:103) ... 16 more ##############################
Depending on the line... aDateString = ''.join(somedate) #ALTERNATIVELY I USED ALSO: aDateString = ''.join(somedate[0]) // aDateString = ''.join(somedate[1]) ... i get different error messages: when i use index 0, the error msg above is given, if i use 1 it is outofbounds, and if i omit the squarebrackets, the error message says:
2012-10-01 16:40:46,280 [main] ERROR org.apache.pig.tools.pigstats.SimplePigStats - ERROR 2997: Unable to recreate exception from backed error: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Error executing function at org.apache.pig.scripting.jython.JythonFunction.exec(JythonFunction.java:106) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:216) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:258) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:316) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:332) at org.apache.pig.backend.hadoop.execution
-
Re: Jython UDFs, Tuples and Stringconversions
Cheolsoo Park 2012-10-01, 21:11
Hi,
Please try this:
1. I used a tab-separated input file as follows:
cheolsoo@localhost:~/workspace/pig-svn $cat tag_count_ts_pro_userpair ('a','b','c','d') 3 {('2012-03-04 10:10:10'),('2013-03-04 10:10:11')}
2. My udf is as follows:
import datetime
@outputSchema("days_from_start:bag{t:tuple(cnt:int)}") def daysFromStart(startDate, aBagOfDates): if aBagOfDates is None: return None result=[] for someDate in aBagOfDates: if someDate is None: continue someDate = ''.join(someDate) if len(someDate)==21: result.append(diffTime(startDate, someDate)) return result
@outputSchema("diff:int") def diffTime(dateFrom, dateTil): dateSmall = datetime.datetime.strptime(dateFrom, "%Y-%m-%d %H:%M:%S") dateBig = datetime.datetime.strptime(dateTil[1:-1], "%Y-%m-%d %H:%M:%S") delta = dateBig - dateSmall return delta.days
3. My pig script is as follows:
register 'udf.py' using jython as moins;
x = load 'tag_count_ts_pro_userpair' using PigStorage('\t') as (group:(), cnt:int, times:{(chararray)}); y = foreach x generate *, moins.daysFromStart('2011-06-01 00:00:00', times); dump y;
This returns:
(('a','b','c','d'),3,{('2012-03-04 10:10:10'),('2013-03-04 10:10:11')},{(277),(642)})
Thanks, Cheolsoo
On Mon, Oct 1, 2012 at 7:42 AM, Björn-Elmar Macek <[EMAIL PROTECTED]>wrote:
> Hi, > > i am currently writing a PIG script that works with a bags of timestamp > tuples. So i am basically working on a datastructure like this: > (tuple(chararray)), int, bag{tuple(chararray)}) > > for example: > ( ('a','b','c','d'), 3, {('2012-03-04 10:10:10'), ('2012-03-04 10:10:11')} > ) > > When loading the data i add a schema, so pig knows what data is coming in: > x = load 'tag_count_ts_pro_userpair' as (group:tuple(),cnt:int,times:** > bag{}); > > I then want to change the content of the times-bag, by replacing every > timestamp with an integer, based on the time distance to a certain date, > which i do with the follwing UDFs: > ###### myUDF.py ############## > from org.apache.pig.scripting import * > import datetime > import math > > > @outputSchema("days_from_**start:bag{t:tuple(cnt:int)}") > def daysFromStart(startDate, aBagOfDates): > if aBagOfDates is None: return None; > result=[] > for somedate in aBagOfDates: > if somedate is None: continue > aDateString = ''.join(somedate) > #ALTERNATIVELY I USED ALSO: aDateString = ''.join(somedate[0]) > // aDateString = ''.join(somedate[1]) > if len(aDateString==16): result.append(diffTime(**startDate, > aDateString)) > return result > > > @outputSchema("diff:int") > def diffTime(dateFrom,dateTil): > dateSmall = datetime.datetime.strptime(**dateFrom,"%Y-%m-%d > %H:%M:%S"); > dateBig = datetime.datetime.strptime(**dateTil,"%Y-%m-%d %H:%M:%S"); > delta = dateBig-dateSmall > return delta.days > > ########################## > > I do this by executing the following command in the grunt: > y = foreach x generate *, moins.daysFromStart('2011-06-**01 00:00:00', > times); > > But when i try to store y, i get the following error message: > > ######## LOG ############# > 2012-10-01 16:35:03,499 [main] ERROR org.apache.pig.tools.pigstats.**SimplePigStats > - ERROR 2997: Unable to recreate exception from backed error: > org.apache.pig.backend.**executionengine.ExecException: ERROR 0: Error > executing function > at org.apache.pig.scripting.**jython.JythonFunction.exec(** > JythonFunction.java:106) > at org.apache.pig.backend.hadoop.**executionengine.physicalLayer.** > expressionOperators.**POUserFunc.getNext(POUserFunc.**java:216) > at org.apache.pig.backend.hadoop.**executionengine.physicalLayer.** > expressionOperators.**POUserFunc.getNext(POUserFunc.**java:258) > at org.apache.pig.backend.hadoop.**executionengine.physicalLayer.** > PhysicalOperator.getNext(**PhysicalOperator.java:316) > at org.apache.pig.backend.hadoop.**executionengine.physicalLayer.**
-
Re: Jython UDFs, Tuples and Stringconversions
Björn-Elmar Macek 2012-10-02, 08:13
Hi Cheilsoo,
ahh thank you for the modifications: the output is what i expect it to be. I will have to look up the Arrayconstruct [1:-1] tho. I could solve the issue by adding a complete schema just as you did with
times:{(chararray)}
.
Thank you alot for your time and insight! Bj�rn Am 01.10.2012 23:11, schrieb Cheolsoo Park: > Hi, > > Please try this: > > 1. I used a tab-separated input file as follows: > > cheolsoo@localhost:~/workspace/pig-svn $cat tag_count_ts_pro_userpair > ('a','b','c','d') 3 {('2012-03-04 10:10:10'),('2013-03-04 10:10:11')} > > 2. My udf is as follows: > > import datetime > > @outputSchema("days_from_start:bag{t:tuple(cnt:int)}") > def daysFromStart(startDate, aBagOfDates): > if aBagOfDates is None: return None > result=[] > for someDate in aBagOfDates: > if someDate is None: continue > someDate = ''.join(someDate) > if len(someDate)==21: result.append(diffTime(startDate, > someDate)) > return result > > @outputSchema("diff:int") > def diffTime(dateFrom, dateTil): > dateSmall = datetime.datetime.strptime(dateFrom, "%Y-%m-%d %H:%M:%S") > dateBig = datetime.datetime.strptime(dateTil[1:-1], "%Y-%m-%d %H:%M:%S") > delta = dateBig - dateSmall > return delta.days > > 3. My pig script is as follows: > > register 'udf.py' using jython as moins; > > x = load 'tag_count_ts_pro_userpair' using PigStorage('\t') as (group:(), > cnt:int, times:{(chararray)}); > y = foreach x generate *, moins.daysFromStart('2011-06-01 00:00:00', times); > dump y; > > This returns: > > (('a','b','c','d'),3,{('2012-03-04 10:10:10'),('2013-03-04 > 10:10:11')},{(277),(642)}) > > Thanks, > Cheolsoo > > On Mon, Oct 1, 2012 at 7:42 AM, Bj�rn-Elmar Macek <[EMAIL PROTECTED]>wrote: > >> Hi, >> >> i am currently writing a PIG script that works with a bags of timestamp >> tuples. So i am basically working on a datastructure like this: >> (tuple(chararray)), int, bag{tuple(chararray)}) >> >> for example: >> ( ('a','b','c','d'), 3, {('2012-03-04 10:10:10'), ('2012-03-04 10:10:11')} >> ) >> >> When loading the data i add a schema, so pig knows what data is coming in: >> x = load 'tag_count_ts_pro_userpair' as (group:tuple(),cnt:int,times:** >> bag{}); >> >> I then want to change the content of the times-bag, by replacing every >> timestamp with an integer, based on the time distance to a certain date, >> which i do with the follwing UDFs: >> ###### myUDF.py ############## >> from org.apache.pig.scripting import * >> import datetime >> import math >> >> >> @outputSchema("days_from_**start:bag{t:tuple(cnt:int)}") >> def daysFromStart(startDate, aBagOfDates): >> if aBagOfDates is None: return None; >> result=[] >> for somedate in aBagOfDates: >> if somedate is None: continue >> aDateString = ''.join(somedate) >> #ALTERNATIVELY I USED ALSO: aDateString = ''.join(somedate[0]) >> // aDateString = ''.join(somedate[1]) >> if len(aDateString==16): result.append(diffTime(**startDate, >> aDateString)) >> return result >> >> >> @outputSchema("diff:int") >> def diffTime(dateFrom,dateTil): >> dateSmall = datetime.datetime.strptime(**dateFrom,"%Y-%m-%d >> %H:%M:%S"); >> dateBig = datetime.datetime.strptime(**dateTil,"%Y-%m-%d %H:%M:%S"); >> delta = dateBig-dateSmall >> return delta.days >> >> ########################## >> >> I do this by executing the following command in the grunt: >> y = foreach x generate *, moins.daysFromStart('2011-06-**01 00:00:00', >> times); >> >> But when i try to store y, i get the following error message: >> >> ######## LOG ############# >> 2012-10-01 16:35:03,499 [main] ERROR org.apache.pig.tools.pigstats.**SimplePigStats >> - ERROR 2997: Unable to recreate exception from backed error: >> org.apache.pig.backend.**executionengine.ExecException: ERROR 0: Error >> executing function >> at org.apache.pig.scripting.**jython.JythonFunction.exec(**
|
|