Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig, mail # user - Pig Latin Program with special Load Function


Copy link to this message
-
Pig Latin Program with special Load Function
John 2013-08-21, 11:12
Im currently writing a Pig Latin programm:

    A = load 'hbase://mytable1' my.packages.CustomizeHBaseStorage('VALUE',
'-loadKey true', 'myrowkey1') as (rowkey:chararray, columncontent:map[]);
    ABag = foreach PATTERN_0 generate flatten(my.packages.MapToBag($1)) as
(output:chararray);

the CustimizeHbaseStorage is loading the row "myrowkey1" and after that the
map for this rowkey is transformed to a Bag. That works fine so far.

So, in the ABag are now some entries. With this entries I try to do load
new row keys (every entry in the bag is the information for a new rowkey I
have to load next). So I tried something like this:

    X= FOREACH ABag {
             TMP = load 'hbase://mytable2'
my.packages.customizeHBaseStorage('VALUE', '-loadKey true', '$0') as
(rowkey:chararray, columncontent:map[]);
            GENERATE (TMP.$0);
    }

This doesn't does not work, because as far as I now the load statement is
not allowed for FOREACH.

So I tried to build my own EvalFunc:

    X = FOREACH INTERMEDIATE_BAG_0 GENERATE my.packages.MyNewUDF($0);

Here is the Java Code for the MyNewUDF:

    ...
    public DataBag exec(Tuple input) throws IOException {
 DataBag result = null;
try {
result = bagFactory.newDefaultBag();
 CustomizeHBaseStorage loader = new CustomizeHBaseStorage("VALUE",
"-loadKey true", input
.get(0).toString());
 loader.getInputFormat();
Tuple curTuple = loader.getNext();
 while (curTuple != null) {
result.add(curTuple);
curTuple = loader.getNext();
 }
} catch (ParseException e) {
e.printStackTrace();
 }
return result;

    }
    ...

I think this would work, but the problem is I got a NullpointerException
because the RecordReader in the HBaseStorage is not initialized when
executing getNext(). So if anybody can say me how I can initialized the
RecordReader (and I think the PigSplit too, because its necessary for
CustomizeHBaseStorage .prepareToRead(RecordReader reader, PigSplit split))
or maybe another approach I would be thankful.
BTW. I know that I can load the whole mytable2 in a new alias and then JOIN
ABag and the new alias, but I try to optimize my program, beacuse it is not
necessary to load the whole mytable2. I try to build a "join" with
information passing.

Thanks