Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - testing issue with reliable sending


Copy link to this message
-
testing issue with reliable sending
Jason Rosenberg 2013-10-04, 18:21
All,

I'm having an issue with an integration test I've setup.  This is using
0.8-beta1.

The test is to verify that no messages are dropped (or the sender gets an
exception thrown back if failure), while doing a rolling restart of a
cluster of 2 brokers.

The producer is configured to use 'request.required.acks' = '1'.

The servers are set up to run locally on localhost, on different ports, and
different data dirs.  The producer connects with a metadata brokerlist
like:  "localhost:2024,localhost:1025" (so no vip).   The servers are set
up with a default replication factor of 2.  The servers have controlled
shutdown enabled, as well.

The producer code looks like this:
    ...
    Producer<Integer, T> producer = getProducer();
    try {
      KeyedMessage<Integer, T> msg = new KeyedMessage<Integer, T>(topic,
message);
      producer.send(msg);
      return true;
    } catch (RuntimeException e) {
      logger.warn("Error sending data to kafka", e);
      return false;
    }
    ...

The test sends groups of messages at each stage of the test (e.g. servers
up, first server going down, first server coming up, second server going
down, etc.).  Then a consumer connects and consumes all the messages, to
make sure they all arrived ok.

It seems intermittently, a single message gets dropped, right after one of
the servers starts going down.  It doesn't happen always, seems to happen 1
out of every 20 test runs or so.  Here's some sample output.  I see the
exception inside the producer code, but I don't see the producer.send
method ever having an exception thrown back out to the caller (the log line
"Error sending data to kafka" is never triggered).

What's interesting, is that it looks like the exceptions are happening on
message 3, but when the consumer subsequently consumes back all the
messages in the broker cluster, it seems message 2 (and not message 3) is
missing:

...
...
7136 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 3, message: 98
7150 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 3, message: 99
7163 [Thread-2] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Shutting down server2
7163 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 4, message: 0
7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer  - Shutting
down KafkaServer
7176 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 4, message: 1
7189 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 4, message: 2
7203 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 4, message: 3
7394 [kafka-request-handler-5] WARN state.change.logger  - Broker
1946108683 received update metadata request with correlation id 7 from an
old controller 178709090 with epoch 2. Latest known controller epoch is 3
7397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis  -
[KafkaApi-1946108683] error when handling request
Name:UpdateMetadataRequest;Version:0;Controller:178709090;ControllerEpoch:2;CorrelationId:7;ClientId:id_178709090-host_null-port_1026;PartitionState:[test-topic,0]
->
(LeaderAndIsrInfo:(Leader:1946108683,ISR:1946108683,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:178709090,1946108683);AliveBrokers:id:1946108683,host:192.168.1.105,port:1025,id:178709090,host:192.168.1.105,port:1026
kafka.common.ControllerMovedException: Broker 1946108683 received update
metadata request with correlation id 7 from an old controller 178709090
with epoch 2. Latest known controller epoch is 3
at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114)
at kafka.server.KafkaApis.handle(KafkaApis.scala:72)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Thread.java:724)
8039 [Controller-178709090-to-broker-178709090-send-thread] WARN
kafka.controller.RequestSendThread  -
[Controller-178709090-to-broker-178709090-send-thread], Controller
178709090 fails to send a request to broker 178709090
java.nio.channels.AsynchronousCloseException
at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)
at
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)
at kafka.utils.Utils$.read(Utils.scala:394)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
8041 [Thread-2] INFO com.squareup.kafka.ng.server.KafkaServer  - Shut down
complete for KafkaServer
17209 [Thread-1] WARN kafka.producer.async.DefaultEventHandler  - Failed to
send producer request with correlation id 810 to broker 178709090 with data
for partitions [test-topic,0]
java.net.SocketTimeoutException
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
at kafka.utils.Utils$.read(Utils.scala:394)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
at
kaf