Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Drill >> mail # dev >> [29/52] [abbrv] [partial] DRILL-165: Reorganize directories (moves only)


Copy link to this message
-
[29/52] [abbrv] [partial] DRILL-165: Reorganize directories (moves only)
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a593d908/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
new file mode 100644
index 0000000..da2d13c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
@@ -0,0 +1,34 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+
+public class BaseRpcOutcomeListener<T> implements RpcOutcomeListener<T> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRpcOutcomeListener.class);
+
+  @Override
+  public void failed(RpcException ex) {
+  }
+
+  @Override
+  public void success(T value, ByteBuf buffer) {
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a593d908/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
new file mode 100644
index 0000000..23dc486
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -0,0 +1,210 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
+
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite>
+    extends RpcBus<T, R> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
+
+  private final Bootstrap b;
+  private volatile boolean connect = false;
+  protected R connection;
+  private final T handshakeType;
+  private final Class<HANDSHAKE_RESPONSE> responseClass;
+  private final Parser<HANDSHAKE_RESPONSE> handshakeParser;
+
+  public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
+      Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
+    super(rpcMapping);
+    this.responseClass = responseClass;
+    this.handshakeType = handshakeType;
+    this.handshakeParser = handshakeParser;
+    
+    b = new Bootstrap() //
+        .group(eventLoopGroup) //
+        .channel(NioSocketChannel.class) //
+        .option(ChannelOption.ALLOCATOR, alloc) //
+        .option(ChannelOption.SO_RCVBUF, 1 << 17) //
+        .option(ChannelOption.SO_SNDBUF, 1 << 17) //
+        .handler(new ChannelInitializer<SocketChannel>() {
+
+          @Override
+          protected void initChannel(SocketChannel ch) throws Exception {
+            logger.debug("initializing client connection.");
+            connection = initRemoteConnection(ch);
+            ch.closeFuture().addListener(getCloseHandler(connection));
+            
+            ch.pipeline().addLast( //
+                new ZeroCopyProtobufLengthDecoder(), //
+                new RpcDecoder("c-" + rpcConfig.getName()), //
+                new RpcEncoder("c-" + rpcConfig.getName()), //
+                new ClientHandshakeHandler(