Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Drill >> mail # dev >> git commit: DRILL-443: Fix potential memory leaks inside HashAggregation and HashTable.


Copy link to this message
-
git commit: DRILL-443: Fix potential memory leaks inside HashAggregation and HashTable.
Repository: incubator-drill
Updated Branches:
  refs/heads/master 4e817d115 -> 84d23350c
DRILL-443: Fix potential memory leaks inside HashAggregation and HashTable.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/84d23350
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/84d23350
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/84d23350

Branch: refs/heads/master
Commit: 84d23350ca278d84dcff48a47537db63707abad5
Parents: 4e817d1
Author: Aman Sinha <[EMAIL PROTECTED]>
Authored: Fri Mar 28 16:12:28 2014 -0700
Committer: Jacques Nadeau <[EMAIL PROTECTED]>
Committed: Sat Mar 29 15:15:40 2014 -0700

 .../physical/impl/aggregate/HashAggBatch.java   |  8 +++-
 .../impl/aggregate/HashAggTemplate.java         | 47 ++++++++++++++------
 .../physical/impl/common/HashTableTemplate.java | 13 +++---
 .../exec/physical/impl/agg/TestHashAggr.java    |  8 ++--
 4 files changed, 50 insertions(+), 26 deletions(-)
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/84d23350/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index d720390..9add544 100644
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -121,17 +121,21 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       }
     }
 
+
     if (aggregator.allFlushed()) {
       return IterOutcome.NONE;
     }
 
+    logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount());
+    
     while(true){
       AggOutcome out = aggregator.doWork();
       logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
       switch(out){
       case CLEANUP_AND_RETURN:
-        container.zeroVectors();
-        aggregator.cleanup();
+        container.clear();
+        aggregator.cleanup();
+        incoming.cleanup();
         done = true;
         return aggregator.getOutcome();
       case RETURN_OUTCOME:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/84d23350/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 3109124..21c0c7d 100644
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+import org.apache.drill.exec.physical.impl.common.HashTableTemplate.BatchHolder;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
@@ -58,7 +59,8 @@ import com.google.common.collect.Lists;
 public abstract class HashAggTemplate implements HashAggregator {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
  
-  private static final boolean EXTRA_DEBUG = false;
+  private static final boolean EXTRA_DEBUG_1 = false;
+  private static final boolean EXTRA_DEBUG_2 = false;
   private static final String TOO_BIG_ERROR = "Couldn't add value to an empty batch.  This likely means that a single value is too long for a varlen field.";
   private boolean first = true;
   private boolean newSchema = false;
@@ -120,7 +122,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     private boolean outputValues() {
       for (int i = 0; i <= maxOccupiedIdx; i++) {
         if (outputRecordValues(i, outputCount) ) {
-          if (EXTRA_DEBUG) logger.debug("Outputting values to {}", outputCount) ;
+          if (EXTRA_DEBUG_2) logger.debug("Outputting values to {}", outputCount) ;
           outputCount++;
         } else {
           return false;
@@ -129,6 +131,10 @@ public abstract class HashAggTemplate implements HashAggregator {
       return true;
     }
 
+    private void clear() {
+      aggrValuesContainer.clear();
+    }
+    
     // Code-generated methods (implemented in HashAggBatch)
 
     @RuntimeOverridden
@@ -199,25 +205,28 @@ public abstract class HashAggTemplate implements HashAggregator {
 
       outside: while(true) {
         // loop through existing records, aggregating the values as necessary.
+        if (EXTRA_DEBUG_1) logger.debug ("Starting outer loop of doWork()...");
         for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-          if(EXTRA_DEBUG) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
+          if(EXTRA_DEBUG_2) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
           checkGroupAndAggrValues(currentIndex);
         }
 
+        if (EXTRA_DEBUG_1) logger.debug("Processed {} records", underlyingIndex);
+        
         try{
 
           while(true){
             IterOutcome out = incoming.next();
-            if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out);
+            if(EXTRA_DEBUG_1) logger.debug("Received IterOutcome of {}", out);
             switch(out){
             case NOT_YET:
               this.outcome = out;
               return AggOutcome.RETURN_OUTCOME;
              
             case OK_NEW_SCHEMA:
-              if(EXTRA_DEBUG) logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
-  
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB