Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # dev >> Non-blocking socket when queue has no new data


Copy link to this message
-
Non-blocking socket when queue has no new data
Hi,

I have a question about non-blocking connections.

>From the code I see the request is handled by an acceptor thread that
passes it on to a Processor thread.
The connection by default is blocking, i.e. the consumer sends the fetch
request and then blocks until there's data available.
I understand that this is exactly the intended behaviour, although I'm
wondering if it's possible to establish a non-blocking connection that
immediately returns if there's no data available at the given offset
(assuming the offset is valid and points at the end of the queue).

We've been load-testing kafka with a few thousand topics and many
short-lived consumers that fetch a limited number of messages before
closing the connection.
We have a socket timeout on the client side to close the socket if there's
no data available, but the kafka server doesn't close the socket at its end
until new data becomes available and a write() call is attempted. When this
happens, we can see the following stack trace in the logs:
=============================================
kafka: INFO  [kafka.network.Processor] (kafka-processor-7) Closing socket
connection to <host>.
kafka: ERROR [kafka.network.Processor] (kafka-processor-5) Closing socket
for <host> because of error
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405)
at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506)
at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:102)
at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:53)
at kafka.network.Processor.write(SocketServer.scala:339)
at kafka.network.Processor.run(SocketServer.scala:216)
at java.lang.Thread.run(Thread.java:662)

=============================================
Apart from the exception in the logs, the file descriptor for the socket is
not released until a write() call is attempted, limiting the amount of
connections that can be established.

I do realise that Kafka was designed to work with long-running processes
that maintain a persistent connection, but I see on the mailing-list a lot
of interest around having a REST interface in front of it or however
consuming data in chunks with short-lived processes, and we're very
interested in this scenario as well for one of our use-cases.

So my question really is, is there a plan to have a non-blocking
connection?
In non-blocking mode, the Response could be immediate and could consist in
the header alone (int32 for the response length, set to 0, followed by
int16 for the error code, which could be set to 0 too or to a new value
indicating that the operation would normally block).
We can probably contribute this feature if not yet available and others
find it useful.

Another option (which I don't like as much) could be setting a request
timeout (server-side) or a way of closing the connection gracefully from
the client side.

Thoughts?

Best regards,
--
Lorenzo Alberton
Chief Tech Architect
DataSift, Inc.
+
Jun Rao 2012-08-08, 15:00
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB