Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Drill >> mail # dev >> git commit: DRILL-443: Fix potential memory leaks inside HashAggregation and HashTable.


Copy link to this message
-
git commit: DRILL-443: Fix potential memory leaks inside HashAggregation and HashTable.
Repository: incubator-drill
Updated Branches:
  refs/heads/master 4e817d115 -> 84d23350c
DRILL-443: Fix potential memory leaks inside HashAggregation and HashTable.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/84d23350
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/84d23350
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/84d23350

Branch: refs/heads/master
Commit: 84d23350ca278d84dcff48a47537db63707abad5
Parents: 4e817d1
Author: Aman Sinha <[EMAIL PROTECTED]>
Authored: Fri Mar 28 16:12:28 2014 -0700
Committer: Jacques Nadeau <[EMAIL PROTECTED]>
Committed: Sat Mar 29 15:15:40 2014 -0700

 .../physical/impl/aggregate/HashAggBatch.java   |  8 +++-
 .../impl/aggregate/HashAggTemplate.java         | 47 ++++++++++++++------
 .../physical/impl/common/HashTableTemplate.java | 13 +++---
 .../exec/physical/impl/agg/TestHashAggr.java    |  8 ++--
 4 files changed, 50 insertions(+), 26 deletions(-)
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/84d23350/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index d720390..9add544 100644
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -121,17 +121,21 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       }
     }
 
+
     if (aggregator.allFlushed()) {
       return IterOutcome.NONE;
     }
 
+    logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount());
+    
     while(true){
       AggOutcome out = aggregator.doWork();
       logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
       switch(out){
       case CLEANUP_AND_RETURN:
-        container.zeroVectors();
-        aggregator.cleanup();
+        container.clear();
+        aggregator.cleanup();
+        incoming.cleanup();
         done = true;
         return aggregator.getOutcome();
       case RETURN_OUTCOME:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/84d23350/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 3109124..21c0c7d 100644
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+import org.apache.drill.exec.physical.impl.common.HashTableTemplate.BatchHolder;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
@@ -58,7 +59,8 @@ import com.google.common.collect.Lists;
 public abstract class HashAggTemplate implements HashAggregator {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
  
-  private static final boolean EXTRA_DEBUG = false;
+  private static final boolean EXTRA_DEBUG_1 = false;
+  private static final boolean EXTRA_DEBUG_2 = false;
   private static final String TOO_BIG_ERROR = "Couldn't add value to an empty batch.  This likely means that a single value is too long for a varlen field.";
   private boolean first = true;
   private boolean newSchema = false;
@@ -120,7 +122,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     private boolean outputValues() {
       for (int i = 0; i <= maxOccupiedIdx; i++) {
         if (outputRecordValues(i, outputCount) ) {
-          if (EXTRA_DEBUG) logger.debug("Outputting values to {}", outputCount) ;
+          if (EXTRA_DEBUG_2) logger.debug("Outputting values to {}", outputCount) ;
           outputCount++;
         } else {
           return false;
@@ -129,6 +131,10 @@ public abstract class HashAggTemplate implements HashAggregator {
       return true;
     }
 
+    private void clear() {
+      aggrValuesContainer.clear();
+    }
+    
     // Code-generated methods (implemented in HashAggBatch)
 
     @RuntimeOverridden
@@ -199,25 +205,28 @@ public abstract class HashAggTemplate implements HashAggregator {
 
       outside: while(true) {
         // loop through existing records, aggregating the values as necessary.
+        if (EXTRA_DEBUG_1) logger.debug ("Starting outer loop of doWork()...");
         for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-          if(EXTRA_DEBUG) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
+          if(EXTRA_DEBUG_2) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
           checkGroupAndAggrValues(currentIndex);
         }
 
+        if (EXTRA_DEBUG_1) logger.debug("Processed {} records", underlyingIndex);
+        
         try{
 
           while(true){
             IterOutcome out = incoming.next();
-            if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out);
+            if(EXTRA_DEBUG_1) logger.debug("Received IterOutcome of {}", out);
             switch(out){
             case NOT_YET:
               this.outcome = out;
               return AggOutcome.RETURN_OUTCOME;
              
             case OK_NEW_SCHEMA:
-              if(EXTRA_DEBUG) logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
-