Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Zookeeper >> mail # user >> Kazoo 'state listener' issue...


+
Matt Wise 2012-12-09, 21:22
+
Matt Wise 2012-12-09, 22:26
+
Matt Wise 2012-12-09, 22:30
Copy link to this message
-
Re: Kazoo 'state listener' issue...
Without really digging into this I'll toss in my initial observation.

Calling zk while still being inside a zk callback seems a bit dangerous.  I would have a queue and event thread and have work from the callbacks feed this queue which would be executed inside the event thread.
Regards,
Alan

On Dec 9, 2012, at 2:30 PM, Matt Wise wrote:

> Just to clarify, if you go and change test() into:
>
>>    def test(self):
>>        # now register a node
>>        self.register_node('/abc/a')
>>        self._zk.stop()
>>        self._zk.start()
>>        self.register_node('/abc/a')
>>        self._zk.get_children('/abc')
>>
>
> and then remove these lines from the state_handler() method:
>>
>>>           for node in nodes.iteritems():
>>>               self.register_node(node[0], data=node[1])
>>
>
> then it works perfectly.. no hang, nothing. it seems that the register_node cannot be called from within the state handler class. Why?
>
> On Dec 9, 2012, at 2:26 PM, Matt Wise <[EMAIL PROTECTED]> wrote:
>
>> Hrmm here's a cleaner way to reproduce the issue:
>>
>> test.py:
>>> from kazoo.client import KazooClient
>>> from kazoo.client import KazooState
>>> from kazoo.handlers.threading import TimeoutError
>>> from kazoo.handlers.gevent import SequentialGeventHandler
>>> import logging
>>>
>>>
>>> class Test(object):
>>>   def __init__(self):
>>>       self.log = logging.getLogger()
>>>       format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>>>       self.log.setLevel(logging.INFO)
>>>       formatter = logging.Formatter(format)
>>>       handler = logging.StreamHandler()
>>>       handler.setFormatter(formatter)
>>>       self.log.addHandler(handler)
>>>
>>>       self.registered_nodes = {}
>>>
>>>       self.log.setLevel(logging.DEBUG)
>>>
>>>       self._zk = KazooClient(hosts='localhost:2182', handler=SequentialGeventHandler())
>>>       self._zk.start()
>>>       self._zk.add_listener(self._state_listener)
>>>       self._state_listener(self._zk.state)
>>>
>>>   def test(self):
>>>       # now register a node
>>>       self.register_node('/abc/a')
>>>       self._zk.stop()
>>>       self._zk.start()
>>>       self._zk.get_children('/abc')
>>>
>>>   def register_node(self, node, data=None):
>>>       if node in self.registered_nodes:
>>>           if data == self.registered_nodes[node]:
>>>               self.log.debug('Already registered [%s] in data provider.' % node)
>>>               return
>>>       self.log.debug('Registering [%s] in data provider.' % node)
>>>       self._zk.create(node, ephemeral=True, makepath=True)
>>>       self.registered_nodes[node] = data
>>>
>>>
>>>   def _state_listener(self,state):
>>>       self.log.warning('Zookeeper connection state changed: %s' % state)
>>>       if state == KazooState.SUSPENDED:
>>>           self.CONNECTION_STATE=False
>>>       elif state == KazooState.LOST:
>>>           self.CONNECTION_STATE=False
>>>       else:
>>>           self.CONNECTION_STATE=True
>>>           nodes = {}
>>>           print self.registered_nodes
>>>           try:
>>>               nodes = self.registered_nodes
>>>           except:
>>>               pass
>>>           self.registered_nodes = {}
>>>           for node in nodes.iteritems():
>>>               self.register_node(node[0], data=node[1])
>>
>> python
>>>>> import test
>>>>> k = test.Test()
>>>>> k.test()
>>
>> (watch it hang ... )
>>
>>
>> On Dec 9, 2012, at 1:22 PM, Matt Wise <[EMAIL PROTECTED]> wrote:
>>
>>> I've got a weird connection issue playing around with Kazoo... If I do something simple like:
>>>
>>>> k = KazooClient()
>>>> k.start()
>>>> k.create('/foo')
>>>> k.stop()
>>>> k.start()
>>>> k.create('/foo')
>>>
>>> it works fine... the node is re-created, all is happy.
>>>
>>> However, if i try to use a state_listener callback to automatically re-register any paths that we had registered on our first connection, it fails. In fact, it doesn't really fail .. it hangs. This only happens if we try to do re-register the paths from within the state listener. If we do it outside of that callback (manually) it works fine. Silly code snippet that will cause the problem:
+
Matt Wise 2012-12-22, 06:00
+
Alan Cabrera 2012-12-24, 17:28