Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Drill, mail # dev - [07/10] git commit: DRILL-271 Address code review comments. VectorAccessibleSerializable now take WritableBatch. Release and reconstruct code now part of Writablebatch class.


Copy link to this message
-
[07/10] git commit: DRILL-271 Address code review comments. VectorAccessibleSerializable now take WritableBatch. Release and reconstruct code now part of Writablebatch class.
jacques@... 2013-11-09, 00:56
DRILL-271 Address code review comments. VectorAccessibleSerializable now take WritableBatch. Release and reconstruct code now part of Writablebatch class.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b44b6c71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b44b6c71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b44b6c71

Branch: refs/heads/master
Commit: b44b6c719bb4cf7159ef0cb0c6ec172b3273f681
Parents: 0ac0b19
Author: Steven Phillips <[EMAIL PROTECTED]>
Authored: Thu Oct 31 17:29:26 2013 -0700
Committer: Steven Phillips <[EMAIL PROTECTED]>
Committed: Thu Oct 31 17:34:56 2013 -0700

----------------------------------------------------------------------
 .../cache/VectorAccessibleSerializable.java     | 181 ++++++-------------
 .../OrderedPartitionRecordBatch.java            |   6 +-
 .../physical/impl/trace/TraceRecordBatch.java   |  25 ++-
 .../apache/drill/exec/record/WritableBatch.java |  58 ++++++
 .../drill/exec/cache/TestVectorCache.java       |   8 +-
 .../drill/exec/cache/TestWriteToDisk.java       |   8 +-
 6 files changed, 133 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b44b6c71/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 62f8097..e5bb94b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -22,75 +22,87 @@ import com.google.common.collect.Lists;
 import com.yammer.metrics.MetricRegistry;
 import com.yammer.metrics.Timer;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.CompositeByteBuf;
 import org.apache.drill.common.util.DataInputInputStream;
 import org.apache.drill.common.util.DataOutputOutputStream;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.metrics.DrillMetrics;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 
 import java.io.*;
 import java.util.List;
 
+/**
+ * A wrapper around a VectorAccessible. Will serialize a VectorAccessible and write to an OutputStream, or can read
+ * from an InputStream and construct a new VectorContainer.
+ */
 public class VectorAccessibleSerializable implements DrillSerializable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class);
   static final MetricRegistry metrics = DrillMetrics.getInstance();
   static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
 
   private VectorAccessible va;
+  private WritableBatch batch;
   private BufferAllocator allocator;
   private int recordCount = -1;
   private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
   private SelectionVector2 sv2;
-  private int incomingRecordCount;
-  private VectorContainer retainedVectorContainer;
-  private SelectionVector2 retainedSelectionVector;
 
   private boolean retain = false;
 
-  /**
-   *
-   * @param va
-   */
-  public VectorAccessibleSerializable(VectorAccessible va, BufferAllocator allocator){
-    this.va = va;
+  public VectorAccessibleSerializable(BufferAllocator allocator) {
     this.allocator = allocator;
-    incomingRecordCount = va.getRecordCount();
+    this.va = new VectorContainer();
   }
 
-  public VectorAccessibleSerializable(VectorAccessible va, SelectionVector2 sv2, BufferAllocator allocator) {
-    this.va = va;
-    this.allocator = allocator;
-    this.sv2 = sv2;
-    if (sv2 != null) this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
-    incomingRecordCount = va.getRecordCount();
+  public VectorAccessibleSerializable(WritableBatch batch, BufferAllocator allocator){
+    this(batch, null, allocator);
   }
 
-  public VectorAccessibleSerializable(BufferAllocator allocator) {
-    this.va = new VectorContainer();
+  /**
+   * Creates a wrapper around batch and sv2 for writing to a stream. sv2 will never be released by this class, and ownership
+   * is maintained by caller.
+   * @param batch
+   * @param sv2
+   * @param allocator
+   */
+  public VectorAccessibleSerializable(WritableBatch batch, SelectionVector2 sv2, BufferAllocator allocator) {
     this.allocator = allocator;
+    if (batch != null) {
+      this.batch = batch;
+    }
+    if (sv2 != null) {
+      this.sv2 = sv2;
+      this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+    }
   }
 
   @Override
   public void read(DataInput input) throws IOException {
     readFromStream(DataInputInputStream.constructInputStream(input));
   }
-  
+
+  /**
+   * Reads from an InputStream and parses a RecordBatchDef. From this, we construct a SelectionVector2 if it exits
+   * and construct the vectors and add them to a vector container
+   * @param input the InputStream to read from
+   * @throws IOException
+   */
   @Override
   public void readFromStream(InputStream input) throws IOException {
     VectorContainer container = new VectorContainer();
     UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
     recordCount = batchDef.getRecordCoun