Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Drill, mail # dev - [08/24] status changes


Copy link to this message
-
[08/24] status changes
jacques@... 2014-05-22, 01:14
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 88bada5..5b61a82 100644
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -79,64 +79,69 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
 
   @Override
   public IterOutcome next() {
-
-    // this is only called on the first batch. Beyond this, the aggregator manages batches.
-    if (aggregator == null) {
-      IterOutcome outcome = incoming.next();
-      logger.debug("Next outcome of {}", outcome);
-      switch (outcome) {
-      case NONE:
-      case NOT_YET:
-      case STOP:
-        return outcome;
-      case OK_NEW_SCHEMA:
-        if (!createAggregator()){
-          done = true;
-          return IterOutcome.STOP;
+    stats.startProcessing();
+    try{
+      // this is only called on the first batch. Beyond this, the aggregator manages batches.
+      if (aggregator == null) {
+        IterOutcome outcome = next(incoming);
+        logger.debug("Next outcome of {}", outcome);
+        switch (outcome) {
+        case NONE:
+        case NOT_YET:
+        case STOP:
+          return outcome;
+        case OK_NEW_SCHEMA:
+          if (!createAggregator()){
+            done = true;
+            return IterOutcome.STOP;
+          }
+          break;
+        case OK:
+          throw new IllegalStateException("You should never get a first batch without a new schema");
+        default:
+          throw new IllegalStateException(String.format("unknown outcome %s", outcome));
         }
-        break;
-      case OK:
-        throw new IllegalStateException("You should never get a first batch without a new schema");
-      default:
-        throw new IllegalStateException(String.format("unknown outcome %s", outcome));
       }
-    }
 
-    while(true){
-      AggOutcome out = aggregator.doWork();
-      logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
-      switch(out){
-      case CLEANUP_AND_RETURN:
-        container.clear();
-        done = true;
-        return aggregator.getOutcome();
-      case RETURN_OUTCOME:
-        return aggregator.getOutcome();
-      case UPDATE_AGGREGATOR:
-        aggregator = null;
-        if(!createAggregator()){
-          return IterOutcome.STOP;
+      while(true){
+        AggOutcome out = aggregator.doWork();
+        logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
+        switch(out){
+        case CLEANUP_AND_RETURN:
+          container.clear();
+          done = true;
+          return aggregator.getOutcome();
+        case RETURN_OUTCOME:
+          return aggregator.getOutcome();
+        case UPDATE_AGGREGATOR:
+          aggregator = null;
+          if(!createAggregator()){
+            return IterOutcome.STOP;
+          }
+          continue;
+        default:
+          throw new IllegalStateException(String.format("Unknown state %s.", out));
         }
-        continue;
-      default:
-        throw new IllegalStateException(String.format("Unknown state %s.", out));
       }
+    }finally{
+      stats.stopProcessing();
     }
-    
   }
 
-  
-  
+
+
   /**
    * Creates a new Aggregator based on the current schema. If setup fails, this method is responsible for cleaning up
    * and informing the context of the failure state, as well is informing the upstream operators.
-   *
+   *
    * @return true if the aggregator was setup successfully. false if there was a failure.
    */
   private boolean createAggregator() {
     logger.debug("Creating new aggregator.");
     try{
+      stats.startSetup();
       this.aggregator = createAggregatorInternal();
+      stats.stopSetup();
       return true;
     }catch(SchemaChangeException | ClassTransformationException | IOException ex){
       context.fail(ex);
@@ -153,13 +158,13 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getFunctionRegistry());
     container.clear();
     List<VectorAllocator> allocators = Lists.newArrayList();
-    
+
     LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().length];
     LogicalExpression[] valueExprs = new LogicalExpression[popConfig.getExprs().length];
     TypedFieldId[] keyOutputIds = new TypedFieldId[popConfig.getKeys().length];
-    
+
     ErrorCollector collector = new ErrorCollectorImpl();
-    
+
     for(int i =0; i < keyExprs.length; i++){
       NamedExpression ne = popConfig.getKeys()[i];
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector,context.getFunctionRegistry() );
@@ -170,38 +175,38 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       allocators.add(VectorAllocator.getAllocator(vector, 50));
       keyOutputIds[i] = container.add(vector);
     }
-    
+
     for(int i =0; i < valueExprs.length; i++){
       NamedExpression ne = popConfig.getExprs()[i];
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
       if(expr == null) continue;
-      
+
       final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
       ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
       allocators.add(VectorAllocator.getAllocator(vector, 50));
       TypedFieldId id = container.add(vector);
       valueExprs[i] = new ValueVectorWriteExpression(id, e