Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Zookeeper, mail # user - Kazoo 'state listener' issue...


+
Matt Wise 2012-12-09, 21:22
+
Matt Wise 2012-12-09, 22:26
+
Matt Wise 2012-12-09, 22:30
+
Alan Cabrera 2012-12-10, 19:18
+
Matt Wise 2012-12-22, 06:00
Copy link to this message
-
Re: Kazoo 'state listener' issue...
Alan Cabrera 2012-12-24, 17:28

On Dec 21, 2012, at 10:00 PM, Matt Wise wrote:

> I just want to circle around on this issue.. The root cause was that the Kazoo thread triggering the 'state listener' callback does so with a lock that it does not release until after the callbacks are finished. However, if your callback tries to use this thread to make a Zookeeper call (ie, get, set, create, delete), it waits on that lock and immediately causes a deadlock.

Ahh, calling user callbacks while holding a lock.  That would do it.  :)

> To handle this scenario they have a 'spawn()' function in the Zookeeper thread handler that you can use:
>
>>            self._zk.handler.spawn(self._re_establish_registrations)
>
> This spawns a thread and immediately returns, allowing the Zookeeper thread to finish its callbacks and release the lock. Thanks a ton to Ben B. for helping me track that down.

Yeah, Ben is awesome.
Regards,
Alan

> On Dec 10, 2012, at 11:18 AM, Alan Cabrera <[EMAIL PROTECTED]> wrote:
>
>> Without really digging into this I'll toss in my initial observation.
>>
>> Calling zk while still being inside a zk callback seems a bit dangerous.  I would have a queue and event thread and have work from the callbacks feed this queue which would be executed inside the event thread.
>>
>>
>> Regards,
>> Alan
>>
>> On Dec 9, 2012, at 2:30 PM, Matt Wise wrote:
>>
>>> Just to clarify, if you go and change test() into:
>>>
>>>>  def test(self):
>>>>      # now register a node
>>>>      self.register_node('/abc/a')
>>>>      self._zk.stop()
>>>>      self._zk.start()
>>>>      self.register_node('/abc/a')
>>>>      self._zk.get_children('/abc')
>>>>
>>>
>>> and then remove these lines from the state_handler() method:
>>>>
>>>>>         for node in nodes.iteritems():
>>>>>             self.register_node(node[0], data=node[1])
>>>>
>>>
>>> then it works perfectly.. no hang, nothing. it seems that the register_node cannot be called from within the state handler class. Why?
>>>
>>> On Dec 9, 2012, at 2:26 PM, Matt Wise <[EMAIL PROTECTED]> wrote:
>>>
>>>> Hrmm here's a cleaner way to reproduce the issue:
>>>>
>>>> test.py:
>>>>> from kazoo.client import KazooClient
>>>>> from kazoo.client import KazooState
>>>>> from kazoo.handlers.threading import TimeoutError
>>>>> from kazoo.handlers.gevent import SequentialGeventHandler
>>>>> import logging
>>>>>
>>>>>
>>>>> class Test(object):
>>>>> def __init__(self):
>>>>>     self.log = logging.getLogger()
>>>>>     format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>>>>>     self.log.setLevel(logging.INFO)
>>>>>     formatter = logging.Formatter(format)
>>>>>     handler = logging.StreamHandler()
>>>>>     handler.setFormatter(formatter)
>>>>>     self.log.addHandler(handler)
>>>>>
>>>>>     self.registered_nodes = {}
>>>>>
>>>>>     self.log.setLevel(logging.DEBUG)
>>>>>
>>>>>     self._zk = KazooClient(hosts='localhost:2182', handler=SequentialGeventHandler())
>>>>>     self._zk.start()
>>>>>     self._zk.add_listener(self._state_listener)
>>>>>     self._state_listener(self._zk.state)
>>>>>
>>>>> def test(self):
>>>>>     # now register a node
>>>>>     self.register_node('/abc/a')
>>>>>     self._zk.stop()
>>>>>     self._zk.start()
>>>>>     self._zk.get_children('/abc')
>>>>>
>>>>> def register_node(self, node, data=None):
>>>>>     if node in self.registered_nodes:
>>>>>         if data == self.registered_nodes[node]:
>>>>>             self.log.debug('Already registered [%s] in data provider.' % node)
>>>>>             return
>>>>>     self.log.debug('Registering [%s] in data provider.' % node)
>>>>>     self._zk.create(node, ephemeral=True, makepath=True)
>>>>>     self.registered_nodes[node] = data
>>>>>
>>>>>
>>>>> def _state_listener(self,state):
>>>>>     self.log.warning('Zookeeper connection state changed: %s' % state)
>>>>>     if state == KazooState.SUSPENDED:
>>>>>         self.CONNECTION_STATE=False
>>>>>     elif state == KazooState.LOST:
>>>>>         self.CONNECTION_STATE=False