Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Drill >> mail # dev >> [08/24] status changes


Copy link to this message
-
[08/24] status changes
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 88bada5..5b61a82 100644
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -79,64 +79,69 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
 
   @Override
   public IterOutcome next() {
-
-    // this is only called on the first batch. Beyond this, the aggregator manages batches.
-    if (aggregator == null) {
-      IterOutcome outcome = incoming.next();
-      logger.debug("Next outcome of {}", outcome);
-      switch (outcome) {
-      case NONE:
-      case NOT_YET:
-      case STOP:
-        return outcome;
-      case OK_NEW_SCHEMA:
-        if (!createAggregator()){
-          done = true;
-          return IterOutcome.STOP;
+    stats.startProcessing();
+    try{
+      // this is only called on the first batch. Beyond this, the aggregator manages batches.
+      if (aggregator == null) {
+        IterOutcome outcome = next(incoming);
+        logger.debug("Next outcome of {}", outcome);
+        switch (outcome) {
+        case NONE:
+        case NOT_YET:
+        case STOP:
+          return outcome;
+        case OK_NEW_SCHEMA:
+          if (!createAggregator()){
+            done = true;
+            return IterOutcome.STOP;
+          }
+          break;
+        case OK:
+          throw new IllegalStateException("You should never get a first batch without a new schema");
+        default:
+          throw new IllegalStateException(String.format("unknown outcome %s", outcome));
         }
-        break;
-      case OK:
-        throw new IllegalStateException("You should never get a first batch without a new schema");
-      default:
-        throw new IllegalStateException(String.format("unknown outcome %s", outcome));
       }
-    }
 
-    while(true){
-      AggOutcome out = aggregator.doWork();
-      logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
-      switch(out){
-      case CLEANUP_AND_RETURN:
-        container.clear();
-        done = true;
-        return aggregator.getOutcome();
-      case RETURN_OUTCOME:
-        return aggregator.getOutcome();
-      case UPDATE_AGGREGATOR:
-        aggregator = null;
-        if(!createAggregator()){
-          return IterOutcome.STOP;
+      while(true){
+        AggOutcome out = aggregator.doWork();
+        logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
+        switch(out){
+        case CLEANUP_AND_RETURN:
+          container.clear();
+          done = true;
+          return aggregator.getOutcome();
+        case RETURN_OUTCOME:
+          return aggregator.getOutcome();
+        case UPDATE_AGGREGATOR:
+          aggregator = null;
+          if(!createAggregator()){
+            return IterOutcome.STOP;
+          }
+          continue;
+        default:
+          throw new IllegalStateException(String.format("Unknown state %s.", out));
         }
-        continue;
-      default:
-        throw new IllegalStateException(String.format("Unknown state %s.", out));
       }
+    }finally{
+      stats.stopProcessing();
     }
-    
   }
 
-  
-  
+
+
   /**
    * Creates a new Aggregator based on the current schema. If setup fails, this method is responsible for cleaning up
    * and informing the context of the failure state, as well is informing the upstream operators.
-   *
+   *
    * @return true if the aggregator was setup successfully. false if there was a failure.
    */
   private boolean createAggregator() {
     logger.debug("Creating new aggregator.");
     try{
+      stats.startSetup();
       this.aggregator = createAggregatorInternal();
+      stats.stopSetup();
       return true;
     }catch(SchemaChangeException | ClassTransformationException | IOException ex){
       context.fail(ex);
@@ -153,13 +158,13 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getFunctionRegistry());
     container.clear();
     List<VectorAllocator> allocators = Lists.newArrayList();
-    
+
     LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().length];
     LogicalExpression[] valueExprs = new LogicalExpression[popConfig.getExprs().length];
     TypedFieldId[] keyOutputIds = new TypedFieldId[popConfig.getKeys().length];
-    
+
     ErrorCollector collector = new ErrorCollectorImpl();
-    
+
     for(int i =0; i < keyExprs.length; i++){
       NamedExpression ne = popConfig.getKeys()[i];
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector,context.getFunctionRegistry() );
@@ -170,38 +175,38 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       allocators.add(VectorAllocator.getAllocator(vector, 50));
       keyOutputIds[i] = container.add(vector);
     }
-    
+
     for(int i =0; i < valueExprs.length; i++){
       NamedExpression ne = popConfig.getExprs()[i];
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
       if(expr == null) continue;
-      
+
       final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
       ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
       allocators.add(VectorAllocator.getAllocator(vector, 50));
       TypedFieldId id = container.add(vector);
       valueExprs[i] = new ValueVectorWriteExpression(id, e
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB