Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> Pig Latin Program with special Load Function


Copy link to this message
-
Pig Latin Program with special Load Function
Im currently writing a Pig Latin programm:

    A = load 'hbase://mytable1' my.packages.CustomizeHBaseStorage('VALUE',
'-loadKey true', 'myrowkey1') as (rowkey:chararray, columncontent:map[]);
    ABag = foreach PATTERN_0 generate flatten(my.packages.MapToBag($1)) as
(output:chararray);

the CustimizeHbaseStorage is loading the row "myrowkey1" and after that the
map for this rowkey is transformed to a Bag. That works fine so far.

So, in the ABag are now some entries. With this entries I try to do load
new row keys (every entry in the bag is the information for a new rowkey I
have to load next). So I tried something like this:

    X= FOREACH ABag {
             TMP = load 'hbase://mytable2'
my.packages.customizeHBaseStorage('VALUE', '-loadKey true', '$0') as
(rowkey:chararray, columncontent:map[]);
            GENERATE (TMP.$0);
    }

This doesn't does not work, because as far as I now the load statement is
not allowed for FOREACH.

So I tried to build my own EvalFunc:

    X = FOREACH INTERMEDIATE_BAG_0 GENERATE my.packages.MyNewUDF($0);

Here is the Java Code for the MyNewUDF:

    ...
    public DataBag exec(Tuple input) throws IOException {
 DataBag result = null;
try {
result = bagFactory.newDefaultBag();
 CustomizeHBaseStorage loader = new CustomizeHBaseStorage("VALUE",
"-loadKey true", input
.get(0).toString());
 loader.getInputFormat();
Tuple curTuple = loader.getNext();
 while (curTuple != null) {
result.add(curTuple);
curTuple = loader.getNext();
 }
} catch (ParseException e) {
e.printStackTrace();
 }
return result;

    }
    ...

I think this would work, but the problem is I got a NullpointerException
because the RecordReader in the HBaseStorage is not initialized when
executing getNext(). So if anybody can say me how I can initialized the
RecordReader (and I think the PigSplit too, because its necessary for
CustomizeHBaseStorage .prepareToRead(RecordReader reader, PigSplit split))
or maybe another approach I would be thankful.
BTW. I know that I can load the whole mytable2 in a new alias and then JOIN
ABag and the new alias, but I try to optimize my program, beacuse it is not
necessary to load the whole mytable2. I try to build a "join" with
information passing.

Thanks
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB