Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # dev >> offset fetch request - response issue and wire protocol kafka 0.8


Copy link to this message
-
offset fetch request - response issue and wire protocol kafka 0.8
Given the
https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html#AGuideToTheKafkaProtocol-OffsetFetchRequestas
basis

OffsetFetchRequest => ConsumerGroup [TopicName [Partition]]
  ConsumerGroup => string
  TopicName => string
  Partition => int32

and

OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]]
  TopicName => string
  Partition => int32
  Offset => int64
  Metadata => string
  ErrorCode => int16

CorrelationId : 3500000001, ClientId: ERLKAFKA, APIKEY -> 7,
APIVERSION 0,   ConsumerGroup:  CG, TopicName: Topic1, Partition: 0

The following bytes are produed in request

RQLEN |APK|APV-| CorrelationI|CLE|  ClientID             |   |
|-------|----|-------------------|--------|-------|

                                                              "CG"
         |     Topic1        |        |   0   |

------|----|---|-------------|---|-----------------------|---|-----|-------|----|-------------------|--------|--------
0,0,0,42,0,7,0,0,208,157,195,1,0,8,69,82,76,75,65,70,75,65,0,2,67,71,0,0,0,1,
0,6,84,111,112,105,99,49,0,0,0,1,0,0,0,0
I get a response of

 RELEN|CorrelationId|    ????

|------|-------------|---------------|
0,0,0,10,208,157,195,1,255,255,0,0,0,0
which seems to be wrong according to the wire protocol.

Also on Kafka Console, I get a

[2013-06-17 01:25:36,440] ERROR [KafkaApi-1] error when handling
request Name: ControlledShutdownRequest; Version: 0; CorrelationId:
-794967295; BrokerId: 542034 (kafka.server.KafkaApis)
kafka.common.BrokerNotAvailableException: Broker id 542034 does not exist.
at kafka.controller.KafkaController.shutdownBroker(KafkaController.scala:139)
at kafka.server.KafkaApis.handleControlledShutdownRequest(KafkaApis.scala:134)
at kafka.server.KafkaApis.handle(KafkaApis.scala:73)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Thread.java:722)

What am I doing wrong?

Thanks

Milind