Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Drill, mail # dev - [2/8] git commit: Fix ScreenCreator so that it captures memory leak failure before returning successful result. Fix memory bugs found by fixing memory leak detection error.


Copy link to this message
-
[2/8] git commit: Fix ScreenCreator so that it captures memory leak failure before returning successful result. Fix memory bugs found by fixing memory leak detection error.
jacques@... 2014-03-27, 06:45
Fix ScreenCreator so that it captures memory leak failure before returning successful result.  Fix memory bugs found by fixing memory leak detection error.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5d098b27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5d098b27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5d098b27

Branch: refs/heads/master
Commit: 5d098b27394430ef81e815a241f9757118d1836e
Parents: 129cd77
Author: Jacques Nadeau <[EMAIL PROTECTED]>
Authored: Tue Mar 25 16:29:20 2014 -0700
Committer: Jacques Nadeau <[EMAIL PROTECTED]>
Committed: Wed Mar 26 22:46:38 2014 -0700

 .../codegen/templates/CastFunctionsTargetVarLen.java     |  2 +-
 .../src/main/codegen/templates/RepeatedValueVectors.java |  6 +++---
 .../apache/drill/exec/physical/impl/ScreenCreator.java   |  7 +++++++
 .../exec/physical/impl/aggregate/StreamingAggBatch.java  |  3 ++-
 .../apache/drill/exec/physical/impl/join/JoinStatus.java | 11 +++++++++++
 .../drill/exec/physical/impl/join/MergeJoinBatch.java    |  2 ++
 .../apache/drill/exec/store/mock/MockRecordReader.java   |  2 +-
 .../drill/exec/physical/impl/agg/TestHashAggr.java       |  3 ++-
 8 files changed, 29 insertions(+), 7 deletions(-)
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d098b27/exec/java-exec/src/main/codegen/templates/CastFunctionsTargetVarLen.java
diff --git a/exec/java-exec/src/main/codegen/templates/CastFunctionsTargetVarLen.java b/exec/java-exec/src/main/codegen/templates/CastFunctionsTargetVarLen.java
index c864e72..319ab6b 100644
+++ b/exec/java-exec/src/main/codegen/templates/CastFunctionsTargetVarLen.java
@@ -50,7 +50,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc{
 
   public void setup(RecordBatch incoming) {
     //TODO: max bufferLength should = parameter.len
-    buffer = incoming.getContext().getAllocator().buffer(${type.bufferLength});
+    buffer = io.netty.buffer.Unpooled.wrappedBuffer(new byte[${type.bufferLength}]);
   }
 
   public void eval() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d098b27/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index 4677374..8a5d506 100644
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -56,7 +56,7 @@ package org.apache.drill.exec.vector;
   }
 
   public int getValueCapacity(){
-    return values.getValueCapacity();
+    return Math.min(values.getValueCapacity(), offsets.getValueCapacity());
   }
  
   public int getBufferSize(){
@@ -324,8 +324,7 @@ package org.apache.drill.exec.vector;
     }
    
     public void generateTestData(){
-      setValueCount(offsets.getAccessor().getValueCount() - 1);
-      int valCount = offsets.getValueCapacity();
+      int valCount = getValueCapacity();
       int[] sizes = {1,2,0,6};
       int size = 0;
       int runningOffset = 0;
@@ -334,6 +333,7 @@ package org.apache.drill.exec.vector;
         offsets.getMutator().set(i, runningOffset);  
       }
       values.getMutator().generateTestData();
+      setValueCount(valCount-1);
     }
    
     public void reset(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d098b27/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 9a6b3b1..2fc854a 100644
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -70,6 +70,11 @@ public class ScreenCreator implements RootCreator<Screen>{
       this.connection = context.getConnection();
     }
    
+    private void closeAllocator(){
+      sendCount.waitForSendComplete();
+      context.getAllocator().close();
+    }
+    
     @Override
     public boolean next() {
       if(!ok){
@@ -81,6 +86,7 @@ public class ScreenCreator implements RootCreator<Screen>{
 //      logger.debug("Screen Outcome {}", outcome);
       switch(outcome){
       case STOP: {
+          closeAllocator();
           QueryResult header = QueryResult.newBuilder() //
               .setQueryId(context.getHandle().getQueryId()) //
               .setRowCount(0) //
@@ -95,6 +101,7 @@ public class ScreenCreator implements RootCreator<Screen>{
           return false;
       }
       case NONE: {
+        closeAllocator();
         context.getStats().batchesCompleted.inc(1);
         QueryResult header = QueryResult.newBuilder() //
             .setQueryId(context.getHandle().getQueryId()) //

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d098b27/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index fccdbd6..5eff355 100644
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -105,7 +105,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
       switch(out){
       case CLEANUP_AND_RETURN:
-        container.zeroVectors();
+        incoming.cleanup();
+        container.clear();
         done = true;
         return aggregator.getOutcome();
       case RETURN_OUTCOME:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d098b27/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/phy