Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka, mail # dev - [jira] [Updated] (KAFKA-956) High-level consumer fails to check topic metadata response for errors


Copy link to this message
-
[jira] [Updated] (KAFKA-956) High-level consumer fails to check topic metadata response for errors
"Sam Meder 2013-06-25, 09:00

     [ https://issues.apache.org/jira/browse/KAFKA-956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sam Meder updated KAFKA-956:
----------------------------

    Status: Patch Available  (was: Open)
    
> High-level consumer fails to check topic metadata response for errors
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-956
>                 URL: https://issues.apache.org/jira/browse/KAFKA-956
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8
>            Reporter: Sam Meder
>            Assignee: Neha Narkhede
>            Priority: Blocker
>             Fix For: 0.8
>
>         Attachments: consumer_metadata_fetch.patch
>
>
> In our environment we noticed that consumers would sometimes hang when started too close to starting the Kafka server. I tracked this down and it seems to be related to some code in rebalance (ZookeeperConsumerConnector.scala). In particular the following code seems problematic:
>       val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
>                                                           brokers,
>                                                           config.clientId,
>                                                           config.socketTimeoutMs,
>                                                           correlationId.getAndIncrement).topicsMetadata
>       val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
>       topicsMetadata.foreach(m => {
>         val topic = m.topic
>         val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
>         partitionsPerTopicMap.put(topic, partitions)
>       })
> The response is never checked for error, so may not actually contain any partition info! Rebalance goes its merry way, but doesn't know about any partitions so never assigns them...

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

 
+
Sam Meder 2013-09-20, 04:25