Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - 0.8 protocol exception


Copy link to this message
-
Re: 0.8 protocol exception
Colin Blower 2013-07-08, 21:14
I had the exact same problem when I started writing code for the new
protocol. This is an oddity with the way the protocol spec uses EBNF to
specify arrays.

Checkout the section on protocol primitives, especially arrays.
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProtocolPrimitiveTypes

Essentially, each array is preceded by its length as an int32.

With regards to your specific buffer:
<Buffer 00 00 00 16 00 03 00 00 00 00 00 00 00 03 66 6f 6f 00 07 6d 79
54 6f 70 69 63>

You seem to be missing the clientId as well as the array length. For
comparison the buffer, with the client I wrote, for clientId
"perl-kafka" and topics "foo" and "myTopic":

00 00 00 26 00 03 00 00 00 00 00 2a 00 0a 70 65 72 6c 2d 6b 61 66 6b 61
00 00 00 02 00 03 66 6f 6f 00 07 6d 79 54 6f 70 69 63
On 07/08/2013 04:12 AM, Vinicius Carvalho wrote:
> Ok, so I've found out the error: The documentation is outdated, the
> MetadataRequest BNF should be:
>
> NumberOfTopics [TopicList]
>
> Had to check the scala source code for that.
>
> Is there a place with a most to date doc?
>
> Regards
>
>
> On Mon, Jul 8, 2013 at 6:42 AM, Vinicius Carvalho <
> [EMAIL PROTECTED]> wrote:
>
>> Hi there. I'm building the 0.8 version of a client to nodejs. I never
>> coded for node and most of my code is following what the prozees guys did
>> (I'm talking to them on updating the lib)
>>
>> But, I'm facing some errors when I test a very simple metadata request.
>> I'm getting this exception on kafka:
>>
>> java.nio.BufferUnderflowException
>> at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
>>  at java.nio.ByteBuffer.get(ByteBuffer.java:694)
>> at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38)
>>  at
>> kafka.api.TopicMetadataRequest$$anonfun$readFrom$1.apply(TopicMetadataRequest.scala:44)
>> at
>> kafka.api.TopicMetadataRequest$$anonfun$readFrom$1.apply(TopicMetadataRequest.scala:43)
>>  at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282)
>> at scala.collection.immutable.Range$$anon$2.foreach(Range.scala:265)
>>  at
>> kafka.api.TopicMetadataRequest$.readFrom(TopicMetadataRequest.scala:43)
>> at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
>>  at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
>> at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:49)
>>  at kafka.network.Processor.read(SocketServer.scala:345)
>> at kafka.network.Processor.run(SocketServer.scala:245)
>>  at java.lang.Thread.run(Thread.java:722)
>>
>> My test sends a metadatarequest (api code 3) using a clientId "foo",
>> correlationId 0 and topicName "myTopic"
>>
>> I don't know where I'm missing something. Please find the js code bellow
>> and also a java counter version that I've created just for the sake of my
>> lack of experience with node and js. I get the same error with the java
>> version:
>>
>> Request.prototype.toBytes = function() {
>>     var encoded = new BufferMaker()
>>         .Int16BE(this.apiKey)
>>         .Int16BE(API_VERSION)
>>         .Int32BE(this.correlationId)
>>         .Int16BE(Buffer.byteLength(this.clientId,'UTF-8'))
>>         .string(this.clientId)
>>         .string(this.requestMessage.toBytes()).make();
>>     var bytes = new
>> BufferMaker().Int32BE(encoded.length).string(encoded).make();
>>
>> MetadataRequest.prototype.toBytes = function () {
>>     var bytes = new
>> BufferMaker().Int16BE(Buffer.byteLength(this.topicName,'UTF-8')).string(this.topicName).make();
>>      return bytes;
>> }
>>
>> var req = new Request(3,0,"foo",new MetadataRequest("myTopic"));
>> var status = connection.write(req.toBytes());
>>
>> Here's the content of the buffer:
>>
>> <Buffer 00 00 00 16 00 03 00 00 00 00 00 00 00 03 66 6f 6f 00 07 6d 79 54
>> 6f 70 69 63>
>>
>> And the java version (using vertx netclient):
>>
>> Buffer writeBuffer = new Buffer()
>>
>>      .appendShort((short) 3) //metadataRequest
>>
>>      .appendShort((short) 0) //ApiVersion
*Colin Blower*
/Software Engineer/
Barracuda Networks Inc.
+1 408-342-5576 (o)