Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Drill, mail # dev - git commit: DRILL-299: OutgoingRecordBatch trying to get RecordCount on incoming batch with outcome NOT_YET


Copy link to this message
-
git commit: DRILL-299: OutgoingRecordBatch trying to get RecordCount on incoming batch with outcome NOT_YET
jacques@... 2013-11-16, 00:55
Updated Branches:
  refs/heads/master 0e830960f -> b07682084
DRILL-299: OutgoingRecordBatch trying to get RecordCount on incoming batch with outcome NOT_YET
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b0768208
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b0768208
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b0768208

Branch: refs/heads/master
Commit: b07682084da96469e310028e67b365d005f99bdb
Parents: 0e83096
Author: Ben Becker <[EMAIL PROTECTED]>
Authored: Fri Nov 15 16:35:12 2013 -0800
Committer: Jacques Nadeau <[EMAIL PROTECTED]>
Committed: Fri Nov 15 16:35:12 2013 -0800

----------------------------------------------------------------------
 .../partitionsender/OutgoingRecordBatch.java    | 41 ++++----------------
 .../physical/impl/TestHashToRandomExchange.java |  2 -
 2 files changed, 8 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b0768208/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 2647ffc..081b4c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaBuilder;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
@@ -50,7 +51,7 @@ import com.google.common.base.Preconditions;
  * next() will never be called on this object.  When a record batch is ready to send (e.g. nearing size
  * limit or schema change), call flush() to send the batch.
  */
-public class OutgoingRecordBatch implements RecordBatch {
+public class OutgoingRecordBatch implements VectorAccessible {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutgoingRecordBatch.class);
 
   private BitTunnel tunnel;
@@ -64,6 +65,8 @@ public class OutgoingRecordBatch implements RecordBatch {
   private int recordCount;
   private int recordCapacity;
   private int oppositeMinorFragmentId;
+  private static int DEFAULT_ALLOC_SIZE = 20000;
+  private static int DEFAULT_VARIABLE_WIDTH_SIZE = 2048;
 
   public OutgoingRecordBatch(HashPartitionSender operator, BitTunnel tunnel, RecordBatch incoming, FragmentContext context, int oppositeMinorFragmentId) {
     this.incoming = incoming;
@@ -71,7 +74,6 @@ public class OutgoingRecordBatch implements RecordBatch {
     this.operator = operator;
     this.tunnel = tunnel;
     this.oppositeMinorFragmentId = oppositeMinorFragmentId;
-    initializeBatch();
   }
 
   public void flushIfNecessary() {
@@ -137,8 +139,8 @@ public class OutgoingRecordBatch implements RecordBatch {
     // must remain valid.
     recordCount = 0;
     for (VectorWrapper<?> v : vectorContainer) {
-      logger.debug("Reallocating vv to capacity " + incoming.getRecordCount() + " after flush.");
-      VectorAllocator.getAllocator(v.getValueVector(), v.getValueVector()).alloc(incoming.getRecordCount());
+      logger.debug("Reallocating vv to capacity " + DEFAULT_ALLOC_SIZE + " after flush.");
+      VectorAllocator.getAllocator(v.getValueVector(), DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_ALLOC_SIZE);
     }
     if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); }
     return true;
@@ -150,7 +152,7 @@ public class OutgoingRecordBatch implements RecordBatch {
    */
   public void initializeBatch() {
     isLast = false;
-    recordCapacity = incoming.getRecordCount();
+    recordCapacity = DEFAULT_ALLOC_SIZE;
     vectorContainer = new VectorContainer();
 
     SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
@@ -186,17 +188,6 @@ public class OutgoingRecordBatch implements RecordBatch {
   }
 
   @Override
-  public IterOutcome next() {
-    assert false;
-    return IterOutcome.STOP;
-  }
-
-  @Override
-  public FragmentContext getContext() {
-    return context;
-  }
-
-  @Override
   public BatchSchema getSchema() {
     Preconditions.checkNotNull(outSchema);
     return outSchema;
@@ -208,21 +199,6 @@ public class OutgoingRecordBatch implements RecordBatch {
   }
 
   @Override
-  public void kill() {
-    incoming.kill();
-  }
-
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public SelectionVector4 getSelectionVector4() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
   public TypedFieldId getValueVectorId(SchemaPath path) {
     return vectorContainer.getValueVectorId(path);
   }
@@ -237,9 +213,8 @@ public class OutgoingRecordBatch implements RecordBatch {
     return vectorContainer.iterator();
   }
 
-  @Override
   public WritableBatch getWritableBatch() {
-    return WritableBatch.get(this);
+    return WritableBatch.getBatchNoHVWrap(recordCount, this, false);
   }
 
  

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b0768208/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExc