Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig, mail # user - Using symlink in Pig to perform a join with the help of a python UDF


Copy link to this message
-
Using symlink in Pig to perform a join with the help of a python UDF
Swaroop Sharma 2013-06-27, 12:23
Hi,

I have two files whose content are as follows:

*File 1:* Field Names: eventid,clientid,channel_number,date_time

114,00001,5003,4/15/2013 11:10

114,00001,5003,4/15/2013 1:22

100,00001,5003,4/15/2013 23:08

114,00002,5002,4/16/2013 8:55
100,00002,5002,4/16/2013 8:15

*File 2:* Field Names: ChannelNumber,ProgramID,Start_Date_Time,End_Date_Time

*5002,112311,4/16/2013 8:00,4/16/2013 8:30*

*5002,124313,4/16/2013 8:30,4/16/2013 9:00*

*5003,113214,4/15/2013 23:00,4/15/2013 23:30*

*5003,123213,4/15/2013 1:00,4/15/2013 2:00*

*5003,123343,4/15/2013 10:30,4/15/2013 11:30*
I want to check if the *channel_number *from *File 1 *matches with
*ChannelNumber
*from *File 2* and if the *Date_time* from *File 1* is between *
Start_Date_Time* and *End_Date_Time* from *File2*.

***Required Output:***

*114,00001,5003,4/15/2013 11:10,5003,123343,4/15/2013 10:30,4/15/2013 11:30
114,00001,5003,4/15/2013 1:22,5003,123213,4/15/2013 1:00,4/15/2013 2:00
100,00001,5003,4/15/2013 23:08,5003,113214,4/15/2013 23:00,4/15/2013 23:30
114,00002,5002,4/16/2013 8:55,5002,124313,4/16/2013 8:30,4/16/2013 9:00
100,00002,5002,4/16/2013 8:15,5002,112311,4/16/2013 8:00,4/16/2013 8:30*
I tried to write a UDF to perform this action and it is as follows:
*Python UDF:*

*def myudf(lst):
 output = [];
 f = open("epg");
 for item in lst:
  for line in f:
   tup = eval(line);
   if item[2]== tup[0] and item[3] >= tup[2] and item[3] <= tup[3]:
    newtup = tuple(lst + tup);
    output.append(newtup);
 return output;
 f.close();*

I have created a Symbolic Link in Pig which links the second file. The
procedure that I've used to create a symlink is as follows:

*    pig -Dmapred.cache.files=hdfs://path/to/file/filename#**epg**
-Dmapred.create.symlink=yes script.pig*

The **script.pig** file contains the Pig script which is executed upon
running the above code.
The Pig script is as follows:

*file1 = load '/user/swaroop.sharma/sampledata_for_udf_test/activitydata/'
using PigStorage(',') as(eventid: chararray,clientid:
chararray,channel_number: chararray,date_time: chararray);

    register udf1.py using jython as test;

    grp_file1 = group file1 by clientid;*

*    finalfile = foreach grp_file1 generate group, test.myudf(file1) as
file1: {(eventid: chararray,clientid: chararray,channelnumber:
chararray,chararray,id: chararray,date_time: chararray,channelnumber:
chararray,programid: chararray,start_date_time: chararray,end_date_time:
chararray)};*

    *store **finalfile** into '/path/where/file/will/be/stored/' using
PigStorage(',');*

My Python UDF gets registered successfully. I have also tested the UDF by
running it on python with the same data by creating two lists.
I am also able to load the file successfully.

However, the *finalfile* is not being created. An error occurs while
running the same.

I have been trying to debug this from past two days with no success. I am
new to Pig and currently, I am in a deadlock.

Also, I know that I can join the two files on *Channel_Number *and then
filter for records whose *date_time *is between *start_date*_time and *
end_date_time*. However, this process takes a long time as my actual data
contains ~10 million rows in file1 with additional fields and file2
contains ~ 50,000 records (using mapside join).

Therefore, I'm planning to use the *symlink* feature in Pig to reduce the
process time.
Any help regarding this matter will be appreciated and I would be extremely
grateful!

Thanks,
Swaroop Sharma