Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # user >> Running Kafka in local test mode


+
Ken Krugler 2012-05-06, 15:21
+
Ken Krugler 2012-05-06, 16:53
+
Jun Rao 2012-05-07, 15:10
+
Ken Krugler 2012-05-09, 00:34
+
Jun Rao 2012-05-09, 00:40
+
Ken Krugler 2012-05-09, 19:01
+
Jun Rao 2012-05-09, 21:03
+
Ken Krugler 2012-05-09, 21:47
+
Jun Rao 2012-05-10, 00:12
+
Hisham Mardam-Bey 2012-05-10, 00:20
+
Ken Krugler 2012-05-10, 01:14
+
Hisham Mardam-Bey 2012-05-10, 03:00
Copy link to this message
-
Re: Running Kafka in local test mode
Hi Hisham,

> I've whipped up a quick Scala example of an embedded Kafka broker,
> producer, and consumer. I don't use ZK and I use the simple consumer
> (manual offset management).
>
> https://gist.github.com/2650743
>
> Hopefully this helps clear things up a bit.

Yes, thanks - and sorry for the delay in responding, I had to step away from this project for a bit.

I was easily able to get the equivalent Java code running.

The remaining question I've got now is how to cleanly shut things down.

I can terminate the Kafka server, but I can't terminate the threads that are running as consumers.

Specifically when I terminate the executor that I'm using to run the consumer threads, this fails, and in the logs I see:

[2012-06-27 20:07:29,696] INFO fetch reconnect due to java.nio.channels.ClosedByInterruptException (kafka.consumer.SimpleConsumer)

The consumer threads are blocked on SimpleConsumer#fetch, and the fetch reconnect means that this doesn't ever return.

Any input on how best to terminate properly in a test environment?

Thanks!

-- Ken
> On Wed, May 9, 2012 at 9:14 PM, Ken Krugler <[EMAIL PROTECTED]> wrote:
>> Hi Hisham,
>>
>> Thanks for chiming in - see below.
>>
>>> When you call shutdown() on KafkaServerStartable you should also call
>>> awaitShutdown if you're expecting your app / JVM to terminate (perhaps
>>> you are not).
>>
>> It's been shutting down properly, but you're right that calling awaitShutdown() ensures that everything has been properly cleaned up before I continue.
>>
>>> Also take into account that if the embedded KafkaServer
>>> instance in KafkaServerStartable throws an exception on shutdown it
>>> will call Runtime.getRuntime.halt(1) (not sure if you want this
>>> behaviour).
>>
>> Not really, but hopefully my unit tests aren't triggering exceptions in the KafkaServer :)
>>
>>> I'm embedding a Kafka broker and producer in an experimental lib doing
>>> pretty much what you're doing except I use KafkaServer directly and
>>> make no use of ZK.
>>
>> I initially tried that, but ran into the issue where Kafka consumers always create a ZooKeeper client, which expects to have a ZooKeeper server running.
>>
>> Do you know if it's possible to set up a complete Kafka environment (broker, producer, consumer) without ZooKeeper?
>>
>> -- Ken
>>
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com
>> custom big data solutions & training
>> Hadoop, Cascading, Mahout & Solr
>>
>>
>>
>>
>
>
>
> --
> Hisham Mardam-Bey
> [ Director of Engineering ] [ Mate1 Inc. ]
>
> A: Because it messes up the order in which people normally read text.
> Q: Why is top-posting such a bad thing?
> A: Top-posting.
> Q: What is the most annoying thing in e-mail?
>
> -=[ Codito Ergo Sum ]=-

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Mahout & Solr