Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Drill >> mail # dev >> [2/7] git commit: DRILL-326: Fixes for merge join allocations


Copy link to this message
-
[2/7] git commit: DRILL-326: Fixes for merge join allocations
DRILL-326: Fixes for merge join allocations
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d3c01968
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d3c01968
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d3c01968

Branch: refs/heads/master
Commit: d3c01968ec0afd5d188f23becefaec456d59b168
Parents: e64a682
Author: Jinfeng Ni <[EMAIL PROTECTED]>
Authored: Mon Mar 17 09:01:21 2014 -0700
Committer: Jacques Nadeau <[EMAIL PROTECTED]>
Committed: Mon Mar 17 09:05:28 2014 -0700

 .../exec/physical/impl/join/JoinStatus.java     |  4 +
 .../exec/physical/impl/join/JoinTemplate.java   | 15 ++--
 .../exec/physical/impl/join/MergeJoinBatch.java | 10 ++-
 .../IteratorValidatorBatchIterator.java         |  4 +
 .../apache/drill/exec/record/RecordBatch.java   |  3 +
 .../exec/physical/impl/join/TestMergeJoin.java  | 26 ++++++
 .../src/test/resources/join/join_batchsize.json | 88 ++++++++++++++++++++
 7 files changed, 142 insertions(+), 8 deletions(-)
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3c01968/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index 762cce7..bf87c0a 100644
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -101,6 +101,10 @@ public final class JoinStatus {
     outputPosition = 0;
   }
 
+  public final void incOutputPos() {
+    outputPosition++;
+  }
+
   public final void notifyLeftRepeating() {
     leftRepeating = true;
     outputBatch.resetBatchBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3c01968/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index 7c8a51c..f43934e 100644
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -95,8 +95,10 @@ public abstract class JoinTemplate implements JoinWorker {
         if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
           // we've hit the end of the right record batch; copy any remaining values from the left batch
           while (status.isLeftPositionAllowed()) {
-            if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos()))
+            if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
               return false;
+            
+            status.incOutputPos();  
             status.advanceLeft();
           }
         }
@@ -110,10 +112,11 @@ public abstract class JoinTemplate implements JoinWorker {
 
       case -1:
         // left key < right key
-        if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT)
-          if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos())) {
+        if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
+          if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
             return false;
-          }
+          status.incOutputPos();
+        }
         status.advanceLeft();
         continue;
 
@@ -140,9 +143,11 @@ public abstract class JoinTemplate implements JoinWorker {
           if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
             return false;
 
-          if (!doCopyRight(status.getRightPosition(), status.fetchAndIncOutputPos()))
+          if (!doCopyRight(status.getRightPosition(), status.getOutPosition()))
             return false;
          
+          status.incOutputPos();
+          
           // If the left key has duplicates and we're about to cross a boundary in the right batch, add the
           // right table's record batch to the sv4 builder before calling next.  These records will need to be
           // copied again for each duplicate left key.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3c01968/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index bd668e7..7680ff9 100644
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -395,16 +395,20 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
   private void allocateBatch() {
     // allocate new batch space.
     container.clear();
-    // add fields from both batches
+    
+    //estimation of joinBatchSize : max of left/right size, expanded by a factor of 16, which is then bounded by MAX_BATCH_SIZE.
+    int joinBatchSize = Math.min(Math.max(left.getRecordCount() , right.getRecordCount() ) * 16, MAX_BATCH_SIZE);
+    
+    // add fields from both batches    
     for (VectorWrapper<?> w : left) {
       ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator());
-      VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / left.getRecordCount())).alloc(left.getRecordCount() * 16);
+      VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / left.getRecordCount())).alloc(joinBatchSize);
       container.add(outgoingVector);
     }
 
     for (VectorWrapper<?> w : right) {
       ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator());
-      VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVect