Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Drill >> mail # dev >> [2/8] git commit: Fix ScreenCreator so that it captures memory leak failure before returning successful result. Fix memory bugs found by fixing memory leak detection error.


Copy link to this message
-
[2/8] git commit: Fix ScreenCreator so that it captures memory leak failure before returning successful result. Fix memory bugs found by fixing memory leak detection error.
Fix ScreenCreator so that it captures memory leak failure before returning successful result.  Fix memory bugs found by fixing memory leak detection error.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5d098b27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5d098b27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5d098b27

Branch: refs/heads/master
Commit: 5d098b27394430ef81e815a241f9757118d1836e
Parents: 129cd77
Author: Jacques Nadeau <[EMAIL PROTECTED]>
Authored: Tue Mar 25 16:29:20 2014 -0700
Committer: Jacques Nadeau <[EMAIL PROTECTED]>
Committed: Wed Mar 26 22:46:38 2014 -0700

 .../codegen/templates/CastFunctionsTargetVarLen.java     |  2 +-
 .../src/main/codegen/templates/RepeatedValueVectors.java |  6 +++---
 .../apache/drill/exec/physical/impl/ScreenCreator.java   |  7 +++++++
 .../exec/physical/impl/aggregate/StreamingAggBatch.java  |  3 ++-
 .../apache/drill/exec/physical/impl/join/JoinStatus.java | 11 +++++++++++
 .../drill/exec/physical/impl/join/MergeJoinBatch.java    |  2 ++
 .../apache/drill/exec/store/mock/MockRecordReader.java   |  2 +-
 .../drill/exec/physical/impl/agg/TestHashAggr.java       |  3 ++-
 8 files changed, 29 insertions(+), 7 deletions(-)
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d098b27/exec/java-exec/src/main/codegen/templates/CastFunctionsTargetVarLen.java
diff --git a/exec/java-exec/src/main/codegen/templates/CastFunctionsTargetVarLen.java b/exec/java-exec/src/main/codegen/templates/CastFunctionsTargetVarLen.java
index c864e72..319ab6b 100644
+++ b/exec/java-exec/src/main/codegen/templates/CastFunctionsTargetVarLen.java
@@ -50,7 +50,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc{
 
   public void setup(RecordBatch incoming) {
     //TODO: max bufferLength should = parameter.len
-    buffer = incoming.getContext().getAllocator().buffer(${type.bufferLength});
+    buffer = io.netty.buffer.Unpooled.wrappedBuffer(new byte[${type.bufferLength}]);
   }
 
   public void eval() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d098b27/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index 4677374..8a5d506 100644
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -56,7 +56,7 @@ package org.apache.drill.exec.vector;
   }
 
   public int getValueCapacity(){
-    return values.getValueCapacity();
+    return Math.min(values.getValueCapacity(), offsets.getValueCapacity());
   }
  
   public int getBufferSize(){
@@ -324,8 +324,7 @@ package org.apache.drill.exec.vector;
     }
    
     public void generateTestData(){
-      setValueCount(offsets.getAccessor().getValueCount() - 1);
-      int valCount = offsets.getValueCapacity();
+      int valCount = getValueCapacity();
       int[] sizes = {1,2,0,6};
       int size = 0;
       int runningOffset = 0;
@@ -334,6 +333,7 @@ package org.apache.drill.exec.vector;
         offsets.getMutator().set(i, runningOffset);  
       }
       values.getMutator().generateTestData();
+      setValueCount(valCount-1);
     }
    
     public void reset(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d098b27/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 9a6b3b1..2fc854a 100644
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -70,6 +70,11 @@ public class ScreenCreator implements RootCreator<Screen>{
       this.connection = context.getConnection();
     }
    
+    private void closeAllocator(){
+      sendCount.waitForSendComplete();
+      context.getAllocator().close();
+    }
+    
     @Override
     public boolean next() {
       if(!ok){
@@ -81,6 +86,7 @@ public class ScreenCreator implements RootCreator<Screen>{
 //      logger.debug("Screen Outcome {}", outcome);
       switch(outcome){
       case STOP: {
+          closeAllocator();
           QueryResult header = QueryResult.newBuilder() //
               .setQueryId(context.getHandle().getQueryId()) //
               .setRowCount(0) //
@@ -95,6 +101,7 @@ public class ScreenCreator implements RootCreator<Screen>{
           return false;
       }
       case NONE: {
+        closeAllocator();
         context.getStats().batchesCompleted.inc(1);
         QueryResult header = QueryResult.newBuilder() //
             .setQueryId(context.getHandle().getQueryId()) //

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d098b27/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index fccdbd6..5eff355 100644
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -105,7 +105,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
       switch(out){
       case CLEANUP_AND_RETURN:
-        container.zeroVectors();
+        incoming.cleanup();
+        container.clear();
         done = true;
         return aggregator.getOutcome();
       case RETURN_OUTCOME:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d098b27/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/phy