Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig, mail # user - Issue with LoadFunc & Slicer


Copy link to this message
-
Issue with LoadFunc & Slicer
Vincent BARAT 2009-09-14, 13:38
Hello,

In the process of to trying to add the support for HBase 0.20.0 in
PIG (trunk) I was trying the tutorial from PIG documentation:

http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer

Unfortunately, when I try:

A = LOAD '27' USING RangeSlicer();
dump A;

PIG reports the following error:

2009-09-14 15:33:46,395 [main] ERROR
org.apache.pig.tools.grunt.Grunt - ERROR 2081: Unable to setup the
load function.

If I provide an existing file, instead of '27', I no longer have
this error, but the output of the dump function is empty.

Any idea ?
Here is my RangeSlicer() code:

========================================================

package com.ubikod.ermin.backend.pigudfs;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
import org.apache.pig.Slice;
import org.apache.pig.Slicer;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class RangeSlicer extends Utf8StorageConverter implements Slicer,
   LoadFunc
{
   private static final Log LOG = LogFactory.getLog(RangeSlicer.class);

   public RangeSlicer()
   {
     LOG.info("RangeSlicer");
   }

   /**
    * Expects location to be a Stringified integer, and makes
    * Integer.parseInt(location) slices. Each slice generates a
single value, its
    * index in the sequence of slices.
    */
   public Slice[] slice(DataStorage store, String location) throws
IOException
   {
     LOG.info("slice #################" + location);
     location = "30";
     // Note: validate has already made sure that location is an integer
     int numslices = Integer.parseInt(location);
     LOG.info("slice #################" + numslices);
     Slice[] slices = new Slice[numslices];
     for (int i = 0; i < slices.length; i++)
     {
       slices[i] = new SingleValueSlice(i);
     }
     return slices;
   }

   public void validate(DataStorage store, String location) throws
IOException
   {
     try
     {
       LOG.info("validate #################" + location);
       Integer.parseInt("30");
       LOG.info("validate #################" + location);
     }
     catch (NumberFormatException nfe)
     {
       throw new IOException(nfe.getMessage());
     }
   }

   /**
    * A Slice that returns a single value from next.
    */
   public static class SingleValueSlice implements Slice
   {
     // note this value is set by the Slicer and will get serialized and
     // deserialized at the remote processing node
     public int val;
     // since we just have a single value, we can use a boolean
rather than a
     // counter
     private transient boolean read;

     public SingleValueSlice(int value)
     {
       LOG.info("SingleValueSlice #################" + value);

       this.val = value;
     }

     public void close() throws IOException
     {
     }

     public long getLength()
     {
       return 1;
     }

     public String[] getLocations()
     {
       return new String[0];
     }

     public long getStart()
     {
       return 0;
     }

     public long getPos() throws IOException
     {
       return read ? 1 : 0;
     }

     public float getProgress() throws IOException
     {
       return read ? 1 : 0;
     }

     public void init(DataStorage store) throws IOException
     {
     }

     public boolean next(Tuple value) throws IOException
     {
       if (!read)
       {
         LOG.info("next #################" + value);

         value.append(val);
         read = true;
         return true;
       }
       return false;
     }

     private static final long serialVersionUID = 1L;
   }

   @Override
   public void bindTo(String arg0, BufferedPositionedInputStream arg1,
     long arg2, long arg3) throws IOException
   {
     LOG.info("bindTo #################" + arg0);
   }

   @Override
   public Schema determineSchema(String arg0, ExecType arg1,
DataStorage arg2)
     throws IOException
   {
     // TODO Auto-generated method stub
     return null;
   }

   @Override
   public void fieldsToRead(Schema arg0)
   {
     // TODO Auto-generated method stub
   }

   @Override
   public Tuple getNext() throws IOException
   {
     // TODO Auto-generated method stub
     return null;
   }
}