Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Zookeeper >> mail # user >> Kazoo 'state listener' issue...


Copy link to this message
-
Re: Kazoo 'state listener' issue...
I just want to circle around on this issue.. The root cause was that the Kazoo thread triggering the 'state listener' callback does so with a lock that it does not release until after the callbacks are finished. However, if your callback tries to use this thread to make a Zookeeper call (ie, get, set, create, delete), it waits on that lock and immediately causes a deadlock.

To handle this scenario they have a 'spawn()' function in the Zookeeper thread handler that you can use:

>             self._zk.handler.spawn(self._re_establish_registrations)

This spawns a thread and immediately returns, allowing the Zookeeper thread to finish its callbacks and release the lock. Thanks a ton to Ben B. for helping me track that down.

--Matt

On Dec 10, 2012, at 11:18 AM, Alan Cabrera <[EMAIL PROTECTED]> wrote:

> Without really digging into this I'll toss in my initial observation.
>
> Calling zk while still being inside a zk callback seems a bit dangerous.  I would have a queue and event thread and have work from the callbacks feed this queue which would be executed inside the event thread.
>
>
> Regards,
> Alan
>
> On Dec 9, 2012, at 2:30 PM, Matt Wise wrote:
>
>> Just to clarify, if you go and change test() into:
>>
>>>   def test(self):
>>>       # now register a node
>>>       self.register_node('/abc/a')
>>>       self._zk.stop()
>>>       self._zk.start()
>>>       self.register_node('/abc/a')
>>>       self._zk.get_children('/abc')
>>>
>>
>> and then remove these lines from the state_handler() method:
>>>
>>>>          for node in nodes.iteritems():
>>>>              self.register_node(node[0], data=node[1])
>>>
>>
>> then it works perfectly.. no hang, nothing. it seems that the register_node cannot be called from within the state handler class. Why?
>>
>> On Dec 9, 2012, at 2:26 PM, Matt Wise <[EMAIL PROTECTED]> wrote:
>>
>>> Hrmm here's a cleaner way to reproduce the issue:
>>>
>>> test.py:
>>>> from kazoo.client import KazooClient
>>>> from kazoo.client import KazooState
>>>> from kazoo.handlers.threading import TimeoutError
>>>> from kazoo.handlers.gevent import SequentialGeventHandler
>>>> import logging
>>>>
>>>>
>>>> class Test(object):
>>>>  def __init__(self):
>>>>      self.log = logging.getLogger()
>>>>      format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>>>>      self.log.setLevel(logging.INFO)
>>>>      formatter = logging.Formatter(format)
>>>>      handler = logging.StreamHandler()
>>>>      handler.setFormatter(formatter)
>>>>      self.log.addHandler(handler)
>>>>
>>>>      self.registered_nodes = {}
>>>>
>>>>      self.log.setLevel(logging.DEBUG)
>>>>
>>>>      self._zk = KazooClient(hosts='localhost:2182', handler=SequentialGeventHandler())
>>>>      self._zk.start()
>>>>      self._zk.add_listener(self._state_listener)
>>>>      self._state_listener(self._zk.state)
>>>>
>>>>  def test(self):
>>>>      # now register a node
>>>>      self.register_node('/abc/a')
>>>>      self._zk.stop()
>>>>      self._zk.start()
>>>>      self._zk.get_children('/abc')
>>>>
>>>>  def register_node(self, node, data=None):
>>>>      if node in self.registered_nodes:
>>>>          if data == self.registered_nodes[node]:
>>>>              self.log.debug('Already registered [%s] in data provider.' % node)
>>>>              return
>>>>      self.log.debug('Registering [%s] in data provider.' % node)
>>>>      self._zk.create(node, ephemeral=True, makepath=True)
>>>>      self.registered_nodes[node] = data
>>>>
>>>>
>>>>  def _state_listener(self,state):
>>>>      self.log.warning('Zookeeper connection state changed: %s' % state)
>>>>      if state == KazooState.SUSPENDED:
>>>>          self.CONNECTION_STATE=False
>>>>      elif state == KazooState.LOST:
>>>>          self.CONNECTION_STATE=False
>>>>      else:
>>>>          self.CONNECTION_STATE=True
>>>>          nodes = {}
>>>>          print self.registered_nodes
>>>>          try:
>>>>              nodes = self.registered_nodes