Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Drill, mail # dev - [3/5] git commit: implement SV4 support for filter


Copy link to this message
-
[3/5] git commit: implement SV4 support for filter
jacques@... 2013-10-31, 00:50
implement SV4 support for filter
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/4481dadc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/4481dadc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/4481dadc

Branch: refs/heads/master
Commit: 4481dadcec3d96638274469e695053c4d7c95805
Parents: a73512d
Author: Ben Becker <[EMAIL PROTECTED]>
Authored: Fri Aug 30 09:13:14 2013 -0700
Committer: Jacques Nadeau <[EMAIL PROTECTED]>
Committed: Wed Oct 30 17:21:53 2013 -0700

----------------------------------------------------------------------
 .../physical/impl/filter/FilterRecordBatch.java | 118 +++++++++++++++----
 .../physical/impl/filter/FilterTemplate.java    | 103 ----------------
 .../physical/impl/filter/FilterTemplate2.java   |  88 ++++++++++++++
 .../physical/impl/filter/FilterTemplate4.java   |  52 ++++++++
 .../exec/physical/impl/filter/Filterer.java     |   3 +-
 .../exec/record/selection/SelectionVector4.java |  11 +-
 .../exec/physical/impl/SimpleRootExec.java      |   5 +
 .../physical/impl/filter/TestSimpleFilter.java  |  30 ++++-
 .../src/test/resources/filter/test_sv4.json     |  42 +++++++
 9 files changed, 323 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4481dadc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index 5f9a06a..e67e531 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.filter;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -27,27 +28,29 @@ import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.AbstractSingleRecordBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TransferPair;
-import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.TypeHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
 public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
 
-  private final SelectionVector2 sv;
+  private SelectionVector2 sv2;
+  private SelectionVector4 sv4;
+  private BufferAllocator.PreAllocator svAllocator;
   private Filterer filter;
-  
-  public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context){
+
+  public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context) {
     super(pop, context, incoming);
-    sv = new SelectionVector2(context.getAllocator());
   }
  
   @Override
@@ -57,18 +60,22 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
 
   @Override
   public int getRecordCount() {
-    return sv.getCount();
+    return sv2 != null ? sv2.getCount() : sv4.getCount();
   }
 
   @Override
   public SelectionVector2 getSelectionVector2() {
-    return sv;
+    return sv2;
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    return sv4;
   }
 
   @Override
   protected void doWork() {
     int recordCount = incoming.getRecordCount();
-    sv.allocateNew(recordCount);
     filter.filterBatch(recordCount);
     for(VectorWrapper<?> v : container){
       ValueVector.Mutator m = v.getValueVector().getMutator();
@@ -79,33 +86,102 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
   @Override
   protected void setupNewSchema() throws SchemaChangeException {
     container.clear();
-    LogicalExpression filterExpression = popConfig.getExpr();
+
+    switch(incoming.getSchema().getSelectionVectorMode()){
+      case NONE:
+        sv2 = new SelectionVector2(context.getAllocator());
+        this.filter = generateSV2Filterer();
+        break;
+      case TWO_BYTE:
+        sv2 = new SelectionVector2(context.getAllocator());
+        this.filter = generateSV2Filterer();
+        break;
+      case FOUR_BYTE:
+        // set up the multi-batch selection vector
+        this.svAllocator = context.getAllocator().getPreAllocator();
+        if (!svAllocator.preAllocate(incoming.getRecordCount()*4))
+          throw new SchemaChangeException("Attempted to filter an SV4 which exceeds allowed memory (" +
+                                          incoming.getRecordCount() * 4 + " bytes)");
+        sv4 = new SelectionVector4(svAllocator.getAllocation(), incoming.getRecordCount(), Character.MAX_VALU