Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Zookeeper >> mail # user >> Kazoo 'state listener' issue...


+
Matt Wise 2012-12-09, 21:22
+
Matt Wise 2012-12-09, 22:26
+
Matt Wise 2012-12-09, 22:30
+
Alan Cabrera 2012-12-10, 19:18
Copy link to this message
-
Re: Kazoo 'state listener' issue...
I just want to circle around on this issue.. The root cause was that the Kazoo thread triggering the 'state listener' callback does so with a lock that it does not release until after the callbacks are finished. However, if your callback tries to use this thread to make a Zookeeper call (ie, get, set, create, delete), it waits on that lock and immediately causes a deadlock.

To handle this scenario they have a 'spawn()' function in the Zookeeper thread handler that you can use:

>             self._zk.handler.spawn(self._re_establish_registrations)

This spawns a thread and immediately returns, allowing the Zookeeper thread to finish its callbacks and release the lock. Thanks a ton to Ben B. for helping me track that down.

--Matt

On Dec 10, 2012, at 11:18 AM, Alan Cabrera <[EMAIL PROTECTED]> wrote:

> Without really digging into this I'll toss in my initial observation.
>
> Calling zk while still being inside a zk callback seems a bit dangerous.  I would have a queue and event thread and have work from the callbacks feed this queue which would be executed inside the event thread.
>
>
> Regards,
> Alan
>
> On Dec 9, 2012, at 2:30 PM, Matt Wise wrote:
>
>> Just to clarify, if you go and change test() into:
>>
>>>   def test(self):
>>>       # now register a node
>>>       self.register_node('/abc/a')
>>>       self._zk.stop()
>>>       self._zk.start()
>>>       self.register_node('/abc/a')
>>>       self._zk.get_children('/abc')
>>>
>>
>> and then remove these lines from the state_handler() method:
>>>
>>>>          for node in nodes.iteritems():
>>>>              self.register_node(node[0], data=node[1])
>>>
>>
>> then it works perfectly.. no hang, nothing. it seems that the register_node cannot be called from within the state handler class. Why?
>>
>> On Dec 9, 2012, at 2:26 PM, Matt Wise <[EMAIL PROTECTED]> wrote:
>>
>>> Hrmm here's a cleaner way to reproduce the issue:
>>>
>>> test.py:
>>>> from kazoo.client import KazooClient
>>>> from kazoo.client import KazooState
>>>> from kazoo.handlers.threading import TimeoutError
>>>> from kazoo.handlers.gevent import SequentialGeventHandler
>>>> import logging
>>>>
>>>>
>>>> class Test(object):
>>>>  def __init__(self):
>>>>      self.log = logging.getLogger()
>>>>      format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>>>>      self.log.setLevel(logging.INFO)
>>>>      formatter = logging.Formatter(format)
>>>>      handler = logging.StreamHandler()
>>>>      handler.setFormatter(formatter)
>>>>      self.log.addHandler(handler)
>>>>
>>>>      self.registered_nodes = {}
>>>>
>>>>      self.log.setLevel(logging.DEBUG)
>>>>
>>>>      self._zk = KazooClient(hosts='localhost:2182', handler=SequentialGeventHandler())
>>>>      self._zk.start()
>>>>      self._zk.add_listener(self._state_listener)
>>>>      self._state_listener(self._zk.state)
>>>>
>>>>  def test(self):
>>>>      # now register a node
>>>>      self.register_node('/abc/a')
>>>>      self._zk.stop()
>>>>      self._zk.start()
>>>>      self._zk.get_children('/abc')
>>>>
>>>>  def register_node(self, node, data=None):
>>>>      if node in self.registered_nodes:
>>>>          if data == self.registered_nodes[node]:
>>>>              self.log.debug('Already registered [%s] in data provider.' % node)
>>>>              return
>>>>      self.log.debug('Registering [%s] in data provider.' % node)
>>>>      self._zk.create(node, ephemeral=True, makepath=True)
>>>>      self.registered_nodes[node] = data
>>>>
>>>>
>>>>  def _state_listener(self,state):
>>>>      self.log.warning('Zookeeper connection state changed: %s' % state)
>>>>      if state == KazooState.SUSPENDED:
>>>>          self.CONNECTION_STATE=False
>>>>      elif state == KazooState.LOST:
>>>>          self.CONNECTION_STATE=False
>>>>      else:
>>>>          self.CONNECTION_STATE=True
>>>>          nodes = {}
>>>>          print self.registered_nodes
>>>>          try:
>>>>              nodes = self.registered_nodes
+
Alan Cabrera 2012-12-24, 17:28
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB