|
|
-
Re: PigStoragepablomar 2012-11-19, 21:17
hi all,
I did it as simple as I could. What about this changes ? PigStorage.java original: private void readField(byte[] buf, int start, int end) { if (start == end) { // NULL value mProtoTuple.add(null); } else { mProtoTuple.add(new DataByteArray(buf, start, end)); } } new: protected void addToCurrentTuple(DataByteArray data) { mProtoTuple.add(data); } protected void readField(byte[] buf, int start, int end) { if (start == end) { // NULL value addToCurrentTuple(null); } else { addToCurrentTuple(new DataByteArray(buf, start, end)); } } so, what's the advantage ? with the new version, if I want to extend PigStorage just to override readField, I do something like: import org.apache.pig.builtin.PigStorage; import org.apache.pig.data.DataByteArray; public class MyLoader extends PigStorage { public MyLoader() { this("\t"); } public MyLoader(String delimiter) { super(delimiter); } @Override protected void readField(byte[] buf, int start, int end) { // remove the field name (example: min=, max=) for (int i=start; i<=end;i++) { if (buf[i] == '=') { start = i + 1; break; } } if (start == end) { // NULL value addToCurrentTuple(null); } else { addToCurrentTuple(new DataByteArray(buf, start, end)); } } } currently, just to override readField, I also need to copy/paste a lot from PigStorage: import java.io.IOException; import java.util.ArrayList; import java.util.Properties; import org.apache.hadoop.io.Text; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.builtin.PigStorage; import org.apache.pig.PigException; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.StorageUtil; import org.apache.pig.impl.util.UDFContext; public class MyLoaderextends PigStorage { private byte fieldDel = '\t'; private ArrayList<Object> mProtoTuple = null; private TupleFactory mTupleFactory = TupleFactory.getInstance(); private boolean mRequiredColumnsInitialized = false; public MyLoader() { this("\t"); } public MyLoader(String delimiter) { super(delimiter); fieldDel = StorageUtil.parseFieldDel(delimiter); } // exactly the same than in PigStorage, but I needed to modify readField which is private, so I have to copy all of it @Override public Tuple getNext() throws IOException { mProtoTuple = new ArrayList<Object>(); if (!mRequiredColumnsInitialized) { if (signature!=null) { Properties p UDFContext.getUDFContext().getUDFProperties(this.getClass()); mRequiredColumns (boolean[])ObjectSerializer.deserialize(p.getProperty(signature)); } mRequiredColumnsInitialized = true; } try { boolean notDone = in.nextKeyValue(); if (!notDone) { return null; } Text value = (Text) in.getCurrentValue(); byte[] buf = value.getBytes(); int len = value.getLength(); int start = 0; int fieldID = 0; for (int i = 0; i < len; i++) { if (buf[i] == fieldDel) { if (mRequiredColumns==null || (mRequiredColumns.length>fieldID && mRequiredColumns[fieldID])) readField(buf, start, i); start = i + 1; fieldID++; } } // pick up the last field if (start <= len && (mRequiredColumns==null || (mRequiredColumns.length>fieldID && mRequiredColumns[fieldID]))) { readField(buf, start, len); } Tuple t = mTupleFactory.newTupleNoCopy(mProtoTuple); return t; } catch (InterruptedException e) { int errCode = 6018; String errMsg = "Error while reading input"; throw new ExecException(errMsg, errCode, PigException.REMOTE_ENVIRONMENT, e); } } private void readField(byte[] buf, int start, int end) { // remove the field name (example: min=, max=) for (int i=start; i<=end;i++) { if (buf[i] == '=') { start = i + 1; break; } } if (start == end) { // NULL value mProtoTuple.add(null); } else { mProtoTuple.add(new DataByteArray(buf, start, end)); } } } On Mon, Nov 19, 2012 at 12:24 PM, pablomar <[EMAIL PROTECTED]>wrote: |