Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> Issue with LoadFunc & Slicer


Copy link to this message
-
Issue with LoadFunc & Slicer
Hello,

In the process of to trying to add the support for HBase 0.20.0 in
PIG (trunk) I was trying the tutorial from PIG documentation:

http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer

Unfortunately, when I try:

A = LOAD '27' USING RangeSlicer();
dump A;

PIG reports the following error:

2009-09-14 15:33:46,395 [main] ERROR
org.apache.pig.tools.grunt.Grunt - ERROR 2081: Unable to setup the
load function.

If I provide an existing file, instead of '27', I no longer have
this error, but the output of the dump function is empty.

Any idea ?
Here is my RangeSlicer() code:

========================================================

package com.ubikod.ermin.backend.pigudfs;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
import org.apache.pig.Slice;
import org.apache.pig.Slicer;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class RangeSlicer extends Utf8StorageConverter implements Slicer,
   LoadFunc
{
   private static final Log LOG = LogFactory.getLog(RangeSlicer.class);

   public RangeSlicer()
   {
     LOG.info("RangeSlicer");
   }

   /**
    * Expects location to be a Stringified integer, and makes
    * Integer.parseInt(location) slices. Each slice generates a
single value, its
    * index in the sequence of slices.
    */
   public Slice[] slice(DataStorage store, String location) throws
IOException
   {
     LOG.info("slice #################" + location);
     location = "30";
     // Note: validate has already made sure that location is an integer
     int numslices = Integer.parseInt(location);
     LOG.info("slice #################" + numslices);
     Slice[] slices = new Slice[numslices];
     for (int i = 0; i < slices.length; i++)
     {
       slices[i] = new SingleValueSlice(i);
     }
     return slices;
   }

   public void validate(DataStorage store, String location) throws
IOException
   {
     try
     {
       LOG.info("validate #################" + location);
       Integer.parseInt("30");
       LOG.info("validate #################" + location);
     }
     catch (NumberFormatException nfe)
     {
       throw new IOException(nfe.getMessage());
     }
   }

   /**
    * A Slice that returns a single value from next.
    */
   public static class SingleValueSlice implements Slice
   {
     // note this value is set by the Slicer and will get serialized and
     // deserialized at the remote processing node
     public int val;
     // since we just have a single value, we can use a boolean
rather than a
     // counter
     private transient boolean read;

     public SingleValueSlice(int value)
     {
       LOG.info("SingleValueSlice #################" + value);

       this.val = value;
     }

     public void close() throws IOException
     {
     }

     public long getLength()
     {
       return 1;
     }

     public String[] getLocations()
     {
       return new String[0];
     }

     public long getStart()
     {
       return 0;
     }

     public long getPos() throws IOException
     {
       return read ? 1 : 0;
     }

     public float getProgress() throws IOException
     {
       return read ? 1 : 0;
     }

     public void init(DataStorage store) throws IOException
     {
     }

     public boolean next(Tuple value) throws IOException
     {
       if (!read)
       {
         LOG.info("next #################" + value);

         value.append(val);
         read = true;
         return true;
       }
       return false;
     }

     private static final long serialVersionUID = 1L;
   }

   @Override
   public void bindTo(String arg0, BufferedPositionedInputStream arg1,
     long arg2, long arg3) throws IOException
   {
     LOG.info("bindTo #################" + arg0);
   }

   @Override
   public Schema determineSchema(String arg0, ExecType arg1,
DataStorage arg2)
     throws IOException
   {
     // TODO Auto-generated method stub
     return null;
   }

   @Override
   public void fieldsToRead(Schema arg0)
   {
     // TODO Auto-generated method stub
   }

   @Override
   public Tuple getNext() throws IOException
   {
     // TODO Auto-generated method stub
     return null;
   }
}
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB