Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # dev >> [jira] [Updated] (KAFKA-956) High-level consumer fails to check topic metadata response for errors


Copy link to this message
-
[jira] [Updated] (KAFKA-956) High-level consumer fails to check topic metadata response for errors

     [ https://issues.apache.org/jira/browse/KAFKA-956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sam Meder updated KAFKA-956:
----------------------------

    Status: Patch Available  (was: Open)
    
> High-level consumer fails to check topic metadata response for errors
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-956
>                 URL: https://issues.apache.org/jira/browse/KAFKA-956
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8
>            Reporter: Sam Meder
>            Assignee: Neha Narkhede
>            Priority: Blocker
>             Fix For: 0.8
>
>         Attachments: consumer_metadata_fetch.patch
>
>
> In our environment we noticed that consumers would sometimes hang when started too close to starting the Kafka server. I tracked this down and it seems to be related to some code in rebalance (ZookeeperConsumerConnector.scala). In particular the following code seems problematic:
>       val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
>                                                           brokers,
>                                                           config.clientId,
>                                                           config.socketTimeoutMs,
>                                                           correlationId.getAndIncrement).topicsMetadata
>       val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
>       topicsMetadata.foreach(m => {
>         val topic = m.topic
>         val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
>         partitionsPerTopicMap.put(topic, partitions)
>       })
> The response is never checked for error, so may not actually contain any partition info! Rebalance goes its merry way, but doesn't know about any partitions so never assigns them...

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB