Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Drill, mail # dev - [33/52] [abbrv] [partial] DRILL-165: Reorganize directories (moves only)


Copy link to this message
-
[33/52] [abbrv] [partial] DRILL-165: Reorganize directories (moves only)
jacques@... 2013-09-04, 11:10
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a593d908/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggBatch.java
new file mode 100644
index 0000000..97e66f9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggBatch.java
@@ -0,0 +1,309 @@
+package org.apache.drill.exec.physical.impl.aggregate;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.CodeGenerator.BlockType;
+import org.apache.drill.exec.expr.CodeGenerator.HoldingContainer;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.HoldingContainerExpression;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.expr.fn.impl.ComparatorFunctions;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JVar;
+
+public class AggBatch extends AbstractRecordBatch<StreamingAggregate> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AggBatch.class);
+
+  private Aggregator aggregator;
+  private final RecordBatch incoming;
+  private boolean done = false;
+  
+  public static enum AggOutcome {
+    RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR;
+  }
+
+  public AggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context) {
+    super(popConfig, context);
+    this.incoming = incoming;
+  }
+
+  @Override
+  public int getRecordCount() {
+    if(done) return 0;
+    return aggregator.getOutputCount();
+  }
+
+  @Override
+  public IterOutcome next() {
+
+    // this is only called on the first batch. Beyond this, the aggregator manages batches.
+    if (aggregator == null) {
+      IterOutcome outcome = incoming.next();
+      logger.debug("Next outcome of {}", outcome);
+      switch (outcome) {
+      case NONE:
+      case NOT_YET:
+      case STOP:
+        return outcome;
+      case OK_NEW_SCHEMA:
+        if (!createAggregator()){
+          done = true;
+          return IterOutcome.STOP;
+        }
+        break;
+      case OK:
+        throw new IllegalStateException("You should never get a first batch without a new schema");
+      default:
+        throw new IllegalStateException(String.format("unknown outcome %s", outcome));
+      }
+    }
+
+    while(true){
+      AggOutcome out = aggregator.doWork();
+      logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
+      switch(out){
+      case CLEANUP_AND_RETURN:
+        container.clear();
+        done = true;
+        return aggregator.getOutcome();
+      case RETURN_OUTCOME:
+        return aggregator.getOutcome();
+      case UPDATE_AGGREGATOR:
+        aggregator = null;
+        if(!createAggregator()){
+          return IterOutcome.STOP;
+        }
+        continue;
+      default:
+        throw new IllegalStateException(String.format("Unknown state %s.", out));
+      }
+    }
+    
+  }
+
+  
+  
+  /**
+   * Creates a new Aggregator based on the current schema. If setup fails, this method is responsible for cleaning up
+   * and informing the context of the failure state, as well is informing the upstream operators.
+   *
+   * @return true if the aggregator was setup successfully. false if there was a failure.
+   */
+  private boolean createAggregator() {
+    logger.debug("Creating new aggregator.");
+    try{
+      this.aggregator = createAggregatorInternal();
+      return true;
+    }catch(SchemaChangeException | ClassTransformationException | IOException ex){
+      context.fail(ex);
+      container.clear();
+      incoming.kill();
+      return false;
+    }
+  }
+
+
+
+
+  private Aggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{
+    CodeGenerator<Aggregator> cg = new CodeGenerator<Aggregator>(AggTemplate.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+    container.clear();
+    List<VectorAllocator> allocators = Lists.newArrayList();
+    
+    LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().length];
+    LogicalExpression[] valueExprs = new LogicalExpression[popConfig.getExprs().length];
+    TypedFieldId[] keyOutputIds = new TypedFieldId[popConfig.getKeys().length];
+