Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Drill >> mail # dev >> [06/10] git commit: DRILL-271 abstract out serialization in trace record batch by using VectorAccessibleSerializable. Add ability to retain vectors.


Copy link to this message
-
[06/10] git commit: DRILL-271 abstract out serialization in trace record batch by using VectorAccessibleSerializable. Add ability to retain vectors.
DRILL-271 abstract out serialization in trace record batch by using VectorAccessibleSerializable. Add ability to retain vectors.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0ac0b19b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0ac0b19b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0ac0b19b

Branch: refs/heads/master
Commit: 0ac0b19b4bd986e6323799ce54453336821a17f7
Parents: 90c302d
Author: Steven Phillips <[EMAIL PROTECTED]>
Authored: Wed Oct 30 18:22:13 2013 -0700
Committer: Steven Phillips <[EMAIL PROTECTED]>
Committed: Thu Oct 31 17:34:46 2013 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   1 +
 .../cache/VectorAccessibleSerializable.java     | 105 +++++++++++-
 .../physical/impl/trace/TraceRecordBatch.java   | 168 +++----------------
 .../exec/record/AbstractSingleRecordBatch.java  |   1 +
 .../src/main/resources/drill-module.conf        |   3 +-
 .../impl/trace/TestTraceOutputDump.java         |  59 +++----
 6 files changed, 147 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0ac0b19b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 3aec702..36504f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -44,4 +44,5 @@ public interface ExecConstants {
   public static final String BIT_SERVER_RPC_THREADS = "drill.exec.rpc.bit.server.threads";
   public static final String USER_SERVER_RPC_THREADS = "drill.exec.rpc.user.server.threads";
   public static final String TRACE_DUMP_DIRECTORY = "drill.exec.trace.directory";
+  public static final String TRACE_DUMP_FILESYSTEM = "drill.exec.trace.filesystem";
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0ac0b19b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index f4a6998..62f8097 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -17,15 +17,18 @@
  */
 package org.apache.drill.exec.cache;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.yammer.metrics.MetricRegistry;
 import com.yammer.metrics.Timer;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
 import org.apache.drill.common.util.DataInputInputStream;
 import org.apache.drill.common.util.DataOutputOutputStream;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -46,6 +49,11 @@ public class VectorAccessibleSerializable implements DrillSerializable {
   private int recordCount = -1;
   private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
   private SelectionVector2 sv2;
+  private int incomingRecordCount;
+  private VectorContainer retainedVectorContainer;
+  private SelectionVector2 retainedSelectionVector;
+
+  private boolean retain = false;
 
   /**
    *
@@ -54,6 +62,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
   public VectorAccessibleSerializable(VectorAccessible va, BufferAllocator allocator){
     this.va = va;
     this.allocator = allocator;
+    incomingRecordCount = va.getRecordCount();
   }
 
   public VectorAccessibleSerializable(VectorAccessible va, SelectionVector2 sv2, BufferAllocator allocator) {
@@ -61,6 +70,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
     this.allocator = allocator;
     this.sv2 = sv2;
     if (sv2 != null) this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+    incomingRecordCount = va.getRecordCount();
   }
 
   public VectorAccessibleSerializable(BufferAllocator allocator) {
@@ -108,7 +118,8 @@ public class VectorAccessibleSerializable implements DrillSerializable {
 
   @Override
   public void writeToStream(OutputStream output) throws IOException {
-    final Timer.Context context = metrics.timer(WRITER_TIMER).time();
+    Preconditions.checkNotNull(output);
+    final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
     WritableBatch batch = WritableBatch.getBatchNoHVWrap(va.getRecordCount(),va,false);
 
     ByteBuf[] incomingBuffers = batch.getBuffers();
@@ -147,7 +158,9 @@ public class VectorAccessibleSerializable implements DrillSerializable {
 
 //        fc.write(svBuf.nioBuffers());
         svBuf.getBytes(0, output, svBuf.readableBytes());
-        svBuf.release();
+        if (!retain) {
+          svBuf.release();
+        }
       }
 
             /* Dump the array of ByteBuf's associated with the value vectors */
@@ -161,11 +174,17 @@ public class VectorAccessibleSerializable implements DrillSerializable {
                  * we create a compound buffer
                  */
         totalBufferLength += buf.readableBytes();
-        buf.release();
+        if (!r