Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig, mail # user - multiple file storage with pig

Copy link to this message
Re: multiple file storage with pig
Jacob Perkins 2013-07-30, 12:57

For your first question what you want to do is called a projection of your "grouped" relation. Something like this should work:

grouped = foreach (group cleaned by (timestamp, sensor_name, sig_generator, sig_id)) generate flatten(group) as (timestamp, sensor_name, sig_generator, sig_id);

the flatten(...) is to take your group key (a tuple with four fields) and flatten it into a single tuple in the grouped relation. What I don't understand is why you'd want to do it that way and not simply do a projection and a distinct instead (since you're not doing anything with the grouped data; counting, summing, other operations on the json data for a single sensor and timestamp, etc). That distinct would look like:

distinct_data = distinct (foreach cleaned generate timestamp..sig_id);
Finally, for the last question, pig has a little known (and therefor little used) storefunc called "MultiStorage". If you want to store separate files, one per timestamp-sensor_name-sig_generator-sig_id then I'd first make a new field using pig's CONCAT function to use as the file name since storing files with parenthesis is probably a bad idea:

with_filename = foreach distinct_data generate CONCAT(timestamp, CONCAT('-', CONCAT(sensor_name, CONCAT('-', CONCAT(sig_generator, CONCAT('-', sig_id)))))) as filename, timestamp..sig_id;

store with_filename into '/data' using org.apache.pig.piggybank.storage.MultiStorage('/data', '0');

where '/data' is the directory on hdfs where the files will go and 0 is the numerical index of the field to use as the file name, called filename in this case. I would warn you though, this will open many streams at once and could ddos your hadoop data nodes if the number of unique filenames is large.


On Jul 30, 2013, at 5:21 AM, Pablo Nebrera wrote:

> Hello
> I have this pig script:
> register '/path_to_jars/elephant-bird-pig-3.0.7.jar';
> register '/path_to_jars/json-simple-1.1.1.jar';
> register '/path_to_jars/redBorder-pig.jar';
> data = load '/data/events/2013/07/29/16h03/part-00001.gz' using
> com.twitter.elephantbird.pig.load.JsonLoader() as (json: map[]);
> cleaned = foreach data generate json#'timestamp'/3600*3600 as timestamp,
> (chararray) json#'sensor_name' as sensor_name, (int) json#'sig_generator'
> as sig_generator, (int) json#'sig_id' as sig_id, json as data;
> grouped = GROUP cleaned BY (timestamp, sensor_name, sig_generator, sig_id);
> The input file is json file something like:
> {"timestamp":1374820560, "sensor_id":2, "sensor_name":"sensor-produccion",
> "sig_generator":1, "sig_id":402, "rev":11, "priority":3,
> "classification":"Misc activity", "msg":"Snort Alert [1:402:11]",
> "payload":"XXXXXXXXX", "proto":"icmp", "proto_id":1, "src":3232287141,
> "src_str":"", "src_name":"", "src_net":"
>", "src_net_name":"", "dst_name":"",
> "dst_str":"", "dst_net":"", "dst_net_name":"
>", "src_country":"N/A", "dst_country":"N/A",
> "src_country_code":"N/A", "dst_country_code":"N/A", "srcport":0,
> "dst":3232287230, "dstport":0, "ethsrc":"0:25:90:56:91:2d",
> "ethdst":"6c:62:6d:42:46:c3", "ethlength":594, "vlan":201,
> "vlan_name":"interna", "vlan_priority":0, "vlan_drop":0, "ttl":64,
> "tos":192, "id":53186, "dgmlen":576, "iplen":65544, "icmptype":3,
> "icmpcode":3, "icmpid":0, "icmpseq":0}
> {"timestamp":1374820618, "sensor_id":2, "sensor_id_snort":0,
> "sensor_name":"sensor-produccion", "sig_generator":1, "sig_id":402,
> "rev":11, "priority":3, "classification":"Misc activity", "msg":"Snort
> Alert [1:402:11]", "payload":"XXXXX2", "proto":"icmp", "proto_id":1,
> "src":3232261121, "src_str":"", "src_name":"",
> "src_net":"", "src_net_name":"",
> "dst_name":"", "dst_str":"", "dst_net":"
>", "dst_net_name":"", "src_country":"N/A",
> "dst_country":"N/A", "src_country_code":"N/A", "dst_country_code":"N/A",