Interesting - this issue has come up several times with Curator users. I
ended up writing a Tech Note on it.
On 5/9/12 1:23 PM, "Patrick Hunt" <[EMAIL PROTECTED]> wrote:
>I believe the issue is that there is a single thread updating
>watchers. If you block that thread then the event can't be delivered.
>On Fri, May 4, 2012 at 4:06 AM, guru singh <[EMAIL PROTECTED]> wrote:
>> Sorry if the subject is not appropriately titled.
>> I'm trying to implement a redis-failover solution using zookeeper.
>> I've been working with the python binding for zk
>> Basically, I have a znode called /master, a watch is set on this so
>> that, whenever master changes, self.master is upated
>> There is another znode called /errors, a watch is set on this via
>> get_children to errors_watcher function.
>> My code is supposed to continuously loop and create a childe znode on
>> /errors, whenever an error is detected.
>> The function errors_watcher, counts the number of children for znode
>> /errors, if it exceeds a certain length, it writes a new master
>> 'ip:port' to the znode /master, this calls the master watcher and
>> updates self.master. I use python's threading.Condition() to block for
>> certain operations, for instance initially when znode /master is
>> created, I wait() for master_watcher to be called which updates
>> self.master and releases the lock. This works as expected, however the
>> problem is that when znode /master is changed from within
>> errors_watcher, if I wait() for master_watcher to be called, updating
>> self.master and then releasing the lock. The code just keeps waiting,
>> the master_watcher is never called. However, if I don't wait after
>> setting znode /master from within errors_watcher, master_watcher is
>> called and it updates self.master.
>> It'll be really helpful if somebody could point out what's wrong? Is
>> it zk or is my understanding of threading.Condition() incorrect?
>> Or both :)
>> Thanks for your help
>> This code snippet below, simulates the problem.
>> class ZKtest:
>> def __init__(self,zk_server):
>> self.master = None
>> self.zk_server = zk_server
>> self.connected = False
>> self.conn_cv = threading.Condition()
>> def global_watcher(self,handle,event,state,path):
>> print 'global watcher called'
>> self.connected = True
>> def master_watcher(self,handle,event,state,path):
>> print 'master watcher called'
>> master = zk.get(self.handle,path,self.master_watcher)
>> self.master = master
>> print 'Master is %s' %(master)
>> def errors_watcher(self,handle,event,state,path):
>> print 'error watcher called'
>> errors =
>> print 'Current errors %d' %(errors)
>> if errors > 5 :
>> print 'Set new master, update znode /master'
>> #self.conn_cv.wait() <-- Why doesn't this return??
>> def create_znodes(self):
>> master = zk.exists(self.handle,'/master',self.master_watcher)
>> if not master:
>> print 'Creating znode /master'
>> else :
>> print 'Updating znode /master'
>> self.conn_cv.wait() # wait until master_watcher has updated