http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a0eabf63/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
new file mode 100644
index 0000000..e9302e1
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus.FragmentState;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.control.Controller;
+import org.apache.drill.exec.rpc.control.WorkEventBus;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.work.EndpointListener;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.exec.work.foreman.Foreman.ForemanManagerListener;
+import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
+import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.RootFragmentManager;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Each Foreman holds its own fragment manager.  This manages the events associated with execution of a particular query across all fragments.  
+ */
+class QueryManager implements FragmentStatusListener{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
+  
+  public Map<FragmentHandle, FragmentData> map = Maps.newHashMap(); // doesn't need to be thread safe as map is generated in a single thread and then accessed by multiple threads for reads only.
+  private final Controller controller;
+  private ForemanManagerListener foreman;
+  private AtomicInteger remainingFragmentCount;
+  private WorkEventBus workBus;
+  private FragmentExecutor rootRunner;
+  private volatile QueryId queryId;
+  
+  public QueryManager(ForemanManagerListener foreman, Controller controller) {
+    super();
+    this.foreman = foreman;
+    this.controller = controller;
+    this.remainingFragmentCount = new AtomicInteger(0);
+    
+  }
+
+  public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, UserClientConnection rootClient, List<PlanFragment> leafFragments, List<PlanFragment> intermediateFragments) throws ExecutionSetupException{
+    remainingFragmentCount.set(leafFragments.size()+1);
+    queryId = rootFragment.getHandle().getQueryId();
+    workBus = bee.getContext().getWorkBus();
+
+    // set up the root fragment first so we'll have incoming buffers available.
+    {
+      FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, new FunctionImplementationRegistry(bee.getContext().getConfig()));
+      IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
+      rootContext.setBuffers(buffers);
+      RootExec rootExec = ImplCreator.getExec(rootContext, rootOperator);
+      // add fragment to local node.
+      map.put(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true));
+      rootRunner = new FragmentExecutor(rootContext, rootExec, new RootStatusHandler(rootContext, rootFragment));
+      RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
+      
+      if(buffers.isDone()){
+        // if we don't have to wait for any incoming data, start the fragment runner.
+        bee.addFragmentRunner(fragmentManager.getRunnable());
+      }else{
+        // if we do, record the fragment manager in the workBus.
+        workBus.setRootFragmentManager(fragmentManager);  
+      }
+      
+    }
+
+    // keep track of intermediate fragments (not root or leaf)
+    for (PlanFragment f : intermediateFragments) {
+      logger.debug("Tracking intermediate remote node {} with data {}", f.getAssignment(), f.getFragmentJson());
+      map.put(f.getHandle
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB