Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> clean shutdown after zookeeper connection failure

Copy link to this message
clean shutdown after zookeeper connection failure

I'm embedding the kafka server (0.7.2) in an application container.   I've
noticed that if I try to start the server without zookeeper being
available, by default it gets a zk connection timeout after 6 seconds, and
then throws an Exception out of KafkaServer.startup()....E.g., I see this
stack trace:

Exception in thread "main"
org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to
zookeeper server within timeout: 6000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:84)
at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44)
at kafka.log.LogManager.<init>(LogManager.scala:93)
at kafka.server.KafkaServer.startup(KafkaServer.scala:58)

So that's ok, I can catch the exception, and then shut everything down
gracefully, in this case.  However, when I do this, it seems there is a
daemon thread still around, which doesn't quit, and so the server never
actually exits the jvm.  Specifically, this thread seems to hang around:

"kafka-logcleaner-0" prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on
condition [112c07000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <7f40d4be8> (a
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
at java.lang.Thread.run(Thread.java:680)

Looking at the code in kafka.log.LogManager(), it does seem like it starts
up the scheduler to clean logs, before then trying to connect to zk (and in
this case fail):

  /* Schedule the cleanup task to delete old logs */
  if(scheduler != null) {
    info("starting log cleaner every " + logCleanupIntervalMs + " ms")
    scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)

So this scheduler does not appear to be stopped if startup fails.  However,
if I catch the above RuntimeException, and then call
KafkaServer.shutdown(), then it will stop the scheduler, and all is good.

However, it seems odd that if I get an exception when calling
KafkaServer.startup(), that I should still have to do a
KafkaServer.shutdown().  Rather, wouldn't it be better to have it
internally cleanup after itself if startup() gets an exception?  I'm not
sure I can reliably call shutdown() after a failed startup()....