I have a remote server (EC2) setup with Kafka cluster setup. There are 3 brokers each running in the port 9092,9093,9094. The zookeeper is running in the port 2181. When I send message to the brokers from my PC, I get an exception which is given below. I did a dump in the remote server, the request is received in the remote server. I am able to locally test the consumer/producer script present in the bin folder. What am I missing? Can you kindly help me in this error? Any help will be highly grateful.
[ INFO] [main 2014-01-27 16:06:50,083] Verifying properties [ INFO] [main 2014-01-27 16:06:50,108] Property metadata.broker.list is overridden to 184.108.40.206:9092,220.127.116.11:9093,18.104.22.168:9094 [ INFO] [main 2014-01-27 16:06:50,108] Property request.required.acks is overridden to 1 [ INFO] [main 2014-01-27 16:06:50,108] Property key.serializer.class is overridden to kafka.serializer.StringEncoder [ INFO] [main 2014-01-27 16:06:50,108] Property serializer.class is overridden to kafka.utils.EncryptEncoder [ INFO] [main 2014-01-27 16:06:50,154] send: encrypted - Message_1 [DEBUG] [main 2014-01-27 16:06:50,298] Handling 1 events [ INFO] [main 2014-01-27 15:59:43,540] Fetching metadata from broker id:0,host:22.214.171.124,port:9093 with correlation id 0 for 1 topic(s) Set(mytopic) [DEBUG] [main 2014-01-27 15:59:43,737] Created socket with SO_TIMEOUT = 10000 (requested 10000), SO_RCVBUF = 8192 (requested -1), SO_SNDBUF = 102400 (requested 102400). [ INFO] [main 2014-01-27 15:59:43,738] Connected to 126.96.36.199:9093 for producing [ INFO] [main 2014-01-27 15:59:44,018] Disconnecting from 188.8.131.52:9093 [DEBUG] [main 2014-01-27 15:59:44,025] Successfully fetched metadata for 1 topic(s) Set(mytopic) [DEBUG] [main 2014-01-27 15:59:44,058] Getting broker partition info for topic mytopic [DEBUG] [main 2014-01-27 15:59:44,060] Partition [mytopic,0] has leader 2 [DEBUG] [main 2014-01-27 15:59:44,072] Broker partitions registered for topic: mytopic are 0 [DEBUG] [main 2014-01-27 15:59:44,091] Sending 1 messages with no compression to [mytopic,0] [DEBUG] [main 2014-01-27 15:59:44,109] Producer sending messages with correlation id 2 for topics [mytopic,0] to broker 2 on ip-10-199-31-87.us-west-1.compute.internal:9093 [ERROR] [main 2014-01-27 15:59:44,129] Producer connection to ip-10-199-31-87.us-west-1.compute.internal:9093 unsuccessful java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:127) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:640) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:100) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:254) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:106) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:100) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at kafka.application.KafkaProducer.sendMessage(KafkaProducer.java:39) at kafka.test.KafkaProducerTest.main(KafkaProducerTest.java:21) [ WARN] [main 2014-01-27 15:59:44,139] Failed to send producer request with correlation id 2 to broker 2 with data for partitions [mytopic,0]
Yeah, I've only skimmed this, but I think I might have something.
All non-vpc type ec2 nodes come with an external IP address and an internal IP address. The external IP address is what grants the node access to the internet--makes it publicly routable. The mechanism by which the external IP address isn't fully disclosed, but one can infer that there exists some NAT device off the instance that maps any inbound traffic to the external IP to the EC2 instances internal IP.
When deploying distributed systems that employ service registry(zk) in EC2, I advise choosing between connecting only on the internal network or only on the external network. The internal network is usually my preference for simple privacy reasons.
..anyway, this means all *internal* name resolution needs to be figured out by using something like /etc/hosts or using all the internal names and the name services provide by AWS(or for the intrepid, set up internal DNS servers.) If you go the internal route, then put all of your nodes in the same security group.
The issue of timing out network requests smacks of a kafka node (be it a broker or producer, I didn't parse which one) either picking up an external IP address from zookeeper or the security groups not allowing access on the ports needed. The default behavior of AWS security groups is to DROP which can be confusing when you're trying to figure something out
Lastly, the mention of needing to connect to kafka(in ec2) from your office desktop makes me think you need to step back and detangle this deployment. Running distributed services in the cloud doesn't magically solve the connectivity issue--especially for distributed systems that register themselves in zookeeper.
- are you running zookeeper *and* kafka across public ip addresses? if no, then do you have a vpn to connect your 'private' network in ec2?
- are you running zookeeper *and* kafka across private ip addresses in ec2? if so, can they all connect with no timeouts? if so, try running a consumer on an ec2 node.
I'll throw out a scenario that I expect you might be in:
- zk quorum in ec2, all connected to each other across private ip addresses - kafka cluster registered into zk, broker id's list either ip addreses or 'private' names that ec2 provides - you punched a hole zookeeper connections in your security group in ec2 - your desktop connects to zookeeper over a public ip - assuming that works - your desktop finds private names or ip addresses in zk for kafka brokers - your desktop can't resolve those internal names, if it could it wouldn't be able to route to the ip addresses
Hope this helps, good luck. On Sun, Feb 23, 2014 at 9:16 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
Apache Lucene, Apache Solr and all other Apache Software Foundation projects and their respective logos are trademarks of the Apache Software Foundation.
Elasticsearch, Kibana, Logstash, and Beats are trademarks of Elasticsearch BV, registered in the U.S. and in other countries. This site and Sematext Group is in no way affiliated with Elasticsearch BV.
Service operated by Sematext