Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Pig >> mail # user >> PigStorage


+
pablomar 2012-11-16, 20:48
+
Dmitriy Ryaboy 2012-11-16, 22:15
+
Bill Graham 2012-11-19, 17:16
+
pablomar 2012-11-19, 17:24
Copy link to this message
-
Re: PigStorage
hi all,

I did it as simple as I could. What about this changes ?
PigStorage.java
original:
    private void readField(byte[] buf, int start, int end) {
        if (start == end) {
            // NULL value
            mProtoTuple.add(null);
        } else {
            mProtoTuple.add(new DataByteArray(buf, start, end));
        }
    }
new:
    protected void addToCurrentTuple(DataByteArray data) {
            mProtoTuple.add(data);
    }

    protected void readField(byte[] buf, int start, int end) {
        if (start == end) {
            // NULL value
            addToCurrentTuple(null);
        } else {
            addToCurrentTuple(new DataByteArray(buf, start, end));
        }
    }

so, what's the advantage ?
with the new version, if I want to extend PigStorage just to override
readField, I do something like:

import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataByteArray;

public class MyLoader extends PigStorage {
  public MyLoader() {
    this("\t");
  }

  public MyLoader(String delimiter) {
    super(delimiter);
  }

  @Override
  protected void readField(byte[] buf, int start, int end) {
    // remove the field name (example: min=, max=)
    for (int i=start; i<=end;i++) {
      if (buf[i] == '=') {
        start = i + 1;
        break;
      }
    }

    if (start == end) {
      // NULL value
      addToCurrentTuple(null);
    } else {
      addToCurrentTuple(new DataByteArray(buf, start, end));
    }
  }
}

currently, just to override readField, I also need to copy/paste a lot from
PigStorage:

import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.hadoop.io.Text;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.PigException;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.StorageUtil;
import org.apache.pig.impl.util.UDFContext;

public class MyLoaderextends PigStorage {
  private byte fieldDel = '\t';
  private ArrayList<Object> mProtoTuple = null;
  private TupleFactory mTupleFactory = TupleFactory.getInstance();
  private boolean mRequiredColumnsInitialized = false;

  public MyLoader() {
    this("\t");
  }

  public MyLoader(String delimiter) {
    super(delimiter);
    fieldDel = StorageUtil.parseFieldDel(delimiter);
  }

  // exactly the same than in PigStorage, but I needed to modify readField
which is private, so I have to copy all of it
    @Override
    public Tuple getNext() throws IOException {
        mProtoTuple = new ArrayList<Object>();
        if (!mRequiredColumnsInitialized) {
            if (signature!=null) {
                Properties p UDFContext.getUDFContext().getUDFProperties(this.getClass());
                mRequiredColumns (boolean[])ObjectSerializer.deserialize(p.getProperty(signature));
            }
            mRequiredColumnsInitialized = true;
        }
        try {
            boolean notDone = in.nextKeyValue();
            if (!notDone) {
                return null;

}

            Text value = (Text) in.getCurrentValue();
            byte[] buf = value.getBytes();
            int len = value.getLength();
            int start = 0;
            int fieldID = 0;
            for (int i = 0; i < len; i++) {
                if (buf[i] == fieldDel) {
                    if (mRequiredColumns==null ||
(mRequiredColumns.length>fieldID && mRequiredColumns[fieldID]))
                        readField(buf, start, i);
                    start = i + 1;
                    fieldID++;
                }
            }
            // pick up the last field
            if (start <= len && (mRequiredColumns==null ||
(mRequiredColumns.length>fieldID && mRequiredColumns[fieldID]))) {
                readField(buf, start, len);
            }
            Tuple t =  mTupleFactory.newTupleNoCopy(mProtoTuple);
            return t;
        } catch (InterruptedException e) {
            int errCode = 6018;
            String errMsg = "Error while reading input";
            throw new ExecException(errMsg, errCode,
                    PigException.REMOTE_ENVIRONMENT, e);
        }

    }

  private void readField(byte[] buf, int start, int end) {
    // remove the field name (example: min=, max=)
    for (int i=start; i<=end;i++) {
      if (buf[i] == '=') {
        start = i + 1;
        break;
      }
    }

    if (start == end) {
      // NULL value
      mProtoTuple.add(null);
    } else {
      mProtoTuple.add(new DataByteArray(buf, start, end));
    }
  }
}
On Mon, Nov 19, 2012 at 12:24 PM, pablomar
<[EMAIL PROTECTED]>wrote:

+
Jonathan Coveney 2012-11-19, 23:32
+
pablomar 2012-11-20, 00:38
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB