Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Drill, mail # dev - [1/6] git commit: DRILL-281 Add broadcast sender


Copy link to this message
-
[1/6] git commit: DRILL-281 Add broadcast sender
jacques@... 2014-01-14, 16:48
Updated Branches:
  refs/heads/master 4a83dae35 -> db0203696
DRILL-281 Add broadcast sender
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/4a872263
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/4a872263
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/4a872263

Branch: refs/heads/master
Commit: 4a87226371c81e0186347ca8d8dead50de5c2de5
Parents: 4a83dae
Author: Timothy Chen <[EMAIL PROTECTED]>
Authored: Tue Nov 19 23:40:14 2013 +0800
Committer: Jacques Nadeau <[EMAIL PROTECTED]>
Committed: Tue Jan 14 08:20:08 2014 -0800

----------------------------------------------------------------------
 .../physical/base/AbstractPhysicalVisitor.java  |   5 +
 .../exec/physical/base/PhysicalVisitor.java     |   1 +
 .../exec/physical/config/BroadcastExchange.java |  79 +++++++++++
 .../exec/physical/config/BroadcastSender.java   |  66 ++++++++++
 .../drill/exec/physical/impl/ScreenCreator.java |   7 +-
 .../broadcastsender/BroadcastSenderCreator.java |  36 +++++
 .../BroadcastSenderRootExec.java                | 131 +++++++++++++++++++
 .../apache/drill/exec/record/WritableBatch.java |   5 +
 .../apache/drill/exec/rpc/bit/BitTunnel.java    |  28 ++++
 .../physical/impl/TestBroadcastExchange.java    |  87 ++++++++++++
 .../resources/sender/broadcast_exchange.json    |  55 ++++++++
 .../sender/broadcast_exchange_long_run.json     |  53 ++++++++
 12 files changed, 548 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index b8a7247..ec7244a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -131,6 +131,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
   }
 
   @Override
+  public T visitBroadcastSender(BroadcastSender op, X value) throws E {
+    return visitSender(op, value);
+  }
+
+  @Override
   public T visitScreen(Screen op, X value) throws E {
     return visitStore(op, value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 040a495..2e2d7fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -53,6 +53,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
   public RETURN visitMergingReceiver(MergingReceiverPOP op, EXTRA value) throws EXCEP;
   public RETURN visitHashPartitionSender(HashToRandomExchange op, EXTRA value) throws EXCEP;
   public RETURN visitRangeSender(RangeSender op, EXTRA value) throws EXCEP;
+  public RETURN visitBroadcastSender(BroadcastSender op, EXTRA value) throws EXCEP;
   public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP;
   public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP;
   public RETURN visitUnionExchange(UnionExchange op, EXTRA value) throws EXCEP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
new file mode 100644
index 0000000..256d3d9
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
@@ -0,0 +1,79 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.base.AbstractExchange;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.physical.base.Sender;
+import org.apache.drill.exec.proto.CoordinationProtos;
+
+import java.util.List