Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Pig >> mail # user >> PigStorage


+
pablomar 2012-11-16, 20:48
+
Dmitriy Ryaboy 2012-11-16, 22:15
+
Bill Graham 2012-11-19, 17:16
+
pablomar 2012-11-19, 17:24
Copy link to this message
-
Re: PigStorage
hi all,

I did it as simple as I could. What about this changes ?
PigStorage.java
original:
    private void readField(byte[] buf, int start, int end) {
        if (start == end) {
            // NULL value
            mProtoTuple.add(null);
        } else {
            mProtoTuple.add(new DataByteArray(buf, start, end));
        }
    }
new:
    protected void addToCurrentTuple(DataByteArray data) {
            mProtoTuple.add(data);
    }

    protected void readField(byte[] buf, int start, int end) {
        if (start == end) {
            // NULL value
            addToCurrentTuple(null);
        } else {
            addToCurrentTuple(new DataByteArray(buf, start, end));
        }
    }

so, what's the advantage ?
with the new version, if I want to extend PigStorage just to override
readField, I do something like:

import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataByteArray;

public class MyLoader extends PigStorage {
  public MyLoader() {
    this("\t");
  }

  public MyLoader(String delimiter) {
    super(delimiter);
  }

  @Override
  protected void readField(byte[] buf, int start, int end) {
    // remove the field name (example: min=, max=)
    for (int i=start; i<=end;i++) {
      if (buf[i] == '=') {
        start = i + 1;
        break;
      }
    }

    if (start == end) {
      // NULL value
      addToCurrentTuple(null);
    } else {
      addToCurrentTuple(new DataByteArray(buf, start, end));
    }
  }
}

currently, just to override readField, I also need to copy/paste a lot from
PigStorage:

import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.hadoop.io.Text;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.PigException;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.StorageUtil;
import org.apache.pig.impl.util.UDFContext;

public class MyLoaderextends PigStorage {
  private byte fieldDel = '\t';
  private ArrayList<Object> mProtoTuple = null;
  private TupleFactory mTupleFactory = TupleFactory.getInstance();
  private boolean mRequiredColumnsInitialized = false;

  public MyLoader() {
    this("\t");
  }

  public MyLoader(String delimiter) {
    super(delimiter);
    fieldDel = StorageUtil.parseFieldDel(delimiter);
  }

  // exactly the same than in PigStorage, but I needed to modify readField
which is private, so I have to copy all of it
    @Override
    public Tuple getNext() throws IOException {
        mProtoTuple = new ArrayList<Object>();
        if (!mRequiredColumnsInitialized) {
            if (signature!=null) {
                Properties p UDFContext.getUDFContext().getUDFProperties(this.getClass());
                mRequiredColumns (boolean[])ObjectSerializer.deserialize(p.getProperty(signature));
            }
            mRequiredColumnsInitialized = true;
        }
        try {
            boolean notDone = in.nextKeyValue();
            if (!notDone) {
                return null;

}

            Text value = (Text) in.getCurrentValue();
            byte[] buf = value.getBytes();
            int len = value.getLength();
            int start = 0;
            int fieldID = 0;
            for (int i = 0; i < len; i++) {
                if (buf[i] == fieldDel) {
                    if (mRequiredColumns==null ||
(mRequiredColumns.length>fieldID && mRequiredColumns[fieldID]))
                        readField(buf, start, i);
                    start = i + 1;
                    fieldID++;
                }
            }
            // pick up the last field
            if (start <= len && (mRequiredColumns==null ||
(mRequiredColumns.length>fieldID && mRequiredColumns[fieldID]))) {
                readField(buf, start, len);
            }
            Tuple t =  mTupleFactory.newTupleNoCopy(mProtoTuple);
            return t;
        } catch (InterruptedException e) {
            int errCode = 6018;
            String errMsg = "Error while reading input";
            throw new ExecException(errMsg, errCode,
                    PigException.REMOTE_ENVIRONMENT, e);
        }

    }

  private void readField(byte[] buf, int start, int end) {
    // remove the field name (example: min=, max=)
    for (int i=start; i<=end;i++) {
      if (buf[i] == '=') {
        start = i + 1;
        break;
      }
    }

    if (start == end) {
      // NULL value
      mProtoTuple.add(null);
    } else {
      mProtoTuple.add(new DataByteArray(buf, start, end));
    }
  }
}
On Mon, Nov 19, 2012 at 12:24 PM, pablomar
<[EMAIL PROTECTED]>wrote:

+
Jonathan Coveney 2012-11-19, 23:32
+
pablomar 2012-11-20, 00:38