Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # dev >> [jira] [Commented] (KAFKA-772) System Test Transient Failure on testcase_0122


Copy link to this message
-
[jira] [Commented] (KAFKA-772) System Test Transient Failure on testcase_0122

    [ https://issues.apache.org/jira/browse/KAFKA-772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13592850#comment-13592850 ]

Sriram Subramanian commented on KAFKA-772:
------------------------------------------

The test failed on Monday and then again failed on Friday. It was clear that the issue was timing related. We tried to reproduce the failure on the local box (repeatedly running the test) but could not reproduce it. I did some code browsing but did not have much luck. So I decided to setup tracing and run the test repeatedly in a distributed environment over the weekend and was hoping that it would fail. Luckily, it did and the trace logs proved to be useful in identifying the issue. Thanks to John for setting this up.

What you see below are excerpts from the trace log which pertain to this failure at different points in time. In this particular failure, topic_2 / partitions 2 had missing logical offsets from 570 to 582 on broker 3 (3 brokers in total).

current fetch offset = 582
current HW = 570
Leader for topic_2/partition 2 = broker 2

1. The lines below show the Fetch request that was issued by broker 3 to broker 2 just before broker 1 was shutdown. The requested offset is 582 for [test_2,2].

[2013-03-02 12:37:56,034] TRACE [ReplicaFetcherThread-0-2], issuing to broker 2 of fetch request Name: FetchRequest; Version: 0; CorrelationId: 121; ClientId: ReplicaFetcherThread-0-2; ReplicaId: 3; MaxWait: 500 ms; MinBytes: 4096 bytes; RequestInfo: [test_1,0] -> PartitionFetchInfo(700,1048576),[test_2,1] -> PartitionFetchInfo(677,1048576),[test_2,2] -> PartitionFetchInfo(582,1048576),[test_2,0] -> PartitionFetchInfo(679,1048576),[test_1,2] -> PartitionFetchInfo(600,1048576),[test_1,1] -> PartitionFetchInfo(699,1048576) (kafka.server.ReplicaFetcherThread)

2. Broker 1 is shutdown and broker 3 handles leader and isr request. Note that [test_2,2] still follows broker 2 but we still issue a makefollower call for it.

[2013-03-02 12:37:56,086] INFO Replica Manager on Broker 3: Handling leader and isr request Name: LeaderAndIsrRequest; Version: 0; CorrelationId: 2; ClientId: ; AckTimeoutMs: 1000 ms; ControllerEpoch: 2; PartitionStateInfo: (test_1,0) -> PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" },1),3),(test_2,1) -> PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,3", "leader":"2", "leaderEpoch":"2" },2),3),(test_2,2) -> PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" },1),3),(test_2,0) -> PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,3", "leader":"2", "leaderEpoch":"2" },2),3),(test_1,2) -> PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,3", "leader":"2", "leaderEpoch":"2" },2),3),(test_1,1) -> PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" },1),3); Leaders: id:2,host:xxxx(kafka.server.ReplicaManager)

3. The leader and isr request results in removing the fetcher to broker 2 for [test_2,2], truncating the log to high watermark (570) and then adding back the fetcher to the same broker.

[2013-03-02 12:37:56,088] INFO [ReplicaFetcherManager on broker 3] removing fetcher on topic test_2, partition 2 (kafka.server.ReplicaFetcherManager)
[2013-03-02 12:37:56,088] INFO [Kafka Log on Broker 3], Truncated log segment /tmp/kafka_server_3_logs/test_2-2/00000000000000000000.log to target offset 570 (kafka.log.Log)
[2013-03-02 12:37:56,088] INFO [ReplicaFetcherManager on broker 3] adding fetcher on topic test_2, partion 2, initOffset 570 to broker 2 with fetcherId 0 (kafka.server.ReplicaFetcherManager)

4. The leader and isr request is completed at this point of time.

[2013-03-02 12:37:56,090] INFO Replica Manager on Broker 3: Completed leader and isr request Name: LeaderAndIsrRequest; Version: 0; CorrelationId: 2; ClientId: ; AckTimeoutMs: 1000 ms; ControllerEpoch: 2; PartitionStateInfo: (test_1,0) -> PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" },1),3),(test_2,1) -> PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,3", "leader":"2", "leaderEpoch":"2" },2),3),(test_2,2) -> PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" },1),3),(test_2,0) -> PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,3", "leader":"2", "leaderEpoch":"2" },2),3),(test_1,2) -> PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,3", "leader":"2", "leaderEpoch":"2" },2),3),(test_1,1) -> PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" },1),3); Leaders: id:2,host:xxxx (kafka.server.ReplicaManager)
5.  A log append happens at offset 582 though the nextOffset for the log is at 570. This append actually pertains to the fetch request at step 1. This explains the gap in the log.

[2013-03-02 12:37:56,098] TRACE [Kafka Log on Broker 3], Appending message set to test_2-2 offset: 582 nextOffset: 570 messageSet: ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0, crc = 1408289663, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=500 cap=500]),582), MessageAndOffset(Message(magic = 0, attributes = 0, crc = 3696400058, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=500 cap=500]),583), MessageAndOffset(Message(magic = 0, attributes = 0, crc = 2403920749, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=500 cap=500]),584), ) (kafka.log.Log)

From the set of steps above, it is clear that some thing is causing the fetch request at step 1 to complete even though step 2 and 3 removed the fetcher for that topic,partition.

Looking at the code now it becomes obvious. The race condition is between the thread that removes the fetcher, truncates the log and adds the fetcher back and the thread that fetches bytes from the leader. Follow the steps below to understand what is happening.

Partition.Scala

          repli
+
Neha Narkhede 2013-03-05, 16:27