Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Drill >> mail # dev >> [07/15] DRILL-334: Subdivide Drillbit control and data messages. Add support for socket backpressure. Add TopLevel and Child memory allocator with debug mode to capture memory leaks. Various memory leak fixes to get build to complete.


Copy link to this message
-
[07/15] DRILL-334: Subdivide Drillbit control and data messages. Add support for socket backpressure. Add TopLevel and Child memory allocator with debug mode to capture memory leaks. Various memory leak fixes to get build to complete.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a0eabf63/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
new file mode 100644
index 0000000..e9302e1
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus.FragmentState;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.control.Controller;
+import org.apache.drill.exec.rpc.control.WorkEventBus;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.work.EndpointListener;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.exec.work.foreman.Foreman.ForemanManagerListener;
+import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
+import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.RootFragmentManager;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Each Foreman holds its own fragment manager.  This manages the events associated with execution of a particular query across all fragments.  
+ */
+class QueryManager implements FragmentStatusListener{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
+  
+  public Map<FragmentHandle, FragmentData> map = Maps.newHashMap(); // doesn't need to be thread safe as map is generated in a single thread and then accessed by multiple threads for reads only.
+  private final Controller controller;
+  private ForemanManagerListener foreman;
+  private AtomicInteger remainingFragmentCount;
+  private WorkEventBus workBus;
+  private FragmentExecutor rootRunner;
+  private volatile QueryId queryId;
+  
+  public QueryManager(ForemanManagerListener foreman, Controller controller) {
+    super();
+    this.foreman = foreman;
+    this.controller = controller;
+    this.remainingFragmentCount = new AtomicInteger(0);
+    
+  }
+
+  public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, UserClientConnection rootClient, List<PlanFragment> leafFragments, List<PlanFragment> intermediateFragments) throws ExecutionSetupException{
+    remainingFragmentCount.set(leafFragments.size()+1);
+    queryId = rootFragment.getHandle().getQueryId();
+    workBus = bee.getContext().getWorkBus();
+
+    // set up the root fragment first so we'll have incoming buffers available.
+    {
+      FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, new FunctionImplementationRegistry(bee.getContext().getConfig()));
+      IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
+      rootContext.setBuffers(buffers);
+      RootExec rootExec = ImplCreator.getExec(rootContext, rootOperator);
+      // add fragment to local node.
+      map.put(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true));
+      rootRunner = new FragmentExecutor(rootContext, rootExec, new RootStatusHandler(rootContext, rootFragment));
+      RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
+      
+      if(buffers.isDone()){
+        // if we don't have to wait for any incoming data, start the fragment runner.
+        bee.addFragmentRunner(fragmentManager.getRunnable());
+      }else{
+        // if we do, record the fragment manager in the workBus.
+        workBus.setRootFragmentManager(fragmentManager);  
+      }
+      
+    }
+
+    // keep track of intermediate fragments (not root or leaf)
+    for (PlanFragment f : intermediateFragments) {
+      logger.debug("Tracking intermediate remote node {} with data {}", f.getAssignment(), f.getFragmentJson());
+      map.put(f.getHandle