Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Zookeeper >> mail # user >> Watch not sent immediately?


Copy link to this message
-
Re: Watch not sent immediately?
Interesting - this issue has come up several times with Curator users. I
ended up writing a Tech Note on it.

https://github.com/Netflix/curator/wiki/Tech-Note-1
-JZ

On 5/9/12 1:23 PM, "Patrick Hunt" <[EMAIL PROTECTED]> wrote:

>I believe the issue is that there is a single thread updating
>watchers. If you block that thread then the event can't be delivered.
>
>Patrick
>
>On Fri, May 4, 2012 at 4:06 AM, guru singh <[EMAIL PROTECTED]> wrote:
>> Hi,
>>
>> Sorry if the subject is not appropriately titled.
>>
>> I'm trying to implement a redis-failover solution using zookeeper.
>> I've been working with the python binding for zk
>> Basically, I have a znode called /master, a watch is set on this so
>> that, whenever master changes, self.master is upated
>> There is another znode called /errors, a watch is set on this via
>> get_children to errors_watcher function.
>> My code is supposed to continuously loop and create a childe znode on
>> /errors, whenever an error is detected.
>> The function errors_watcher, counts the number of children for znode
>> /errors, if it exceeds a certain length, it writes a new master
>> 'ip:port' to the znode /master, this calls the master watcher and
>> updates self.master. I use python's threading.Condition() to block for
>> certain operations, for instance initially when znode /master is
>> created, I wait() for master_watcher to be called which updates
>> self.master and releases the lock. This works as expected, however the
>> problem is that when znode /master is changed from within
>> errors_watcher, if I wait() for master_watcher to be called, updating
>> self.master and then releasing the lock. The code just keeps waiting,
>> the master_watcher is never called. However, if I don't wait after
>> setting znode /master from within errors_watcher, master_watcher is
>> called and it updates self.master.
>>
>> It'll be really helpful if somebody could point out what's wrong? Is
>> it zk or is my understanding of threading.Condition() incorrect?
>> Or both :)
>> Thanks for your help
>>
>> This code snippet below, simulates the problem.
>>
>> class ZKtest:
>>
>>    def __init__(self,zk_server):
>>        zk.set_log_stream(open('zk.log','w'))
>>        self.master = None
>>        self.zk_server = zk_server
>>        self.connected = False
>>        self.conn_cv = threading.Condition()
>>
>>    def global_watcher(self,handle,event,state,path):
>>        self.conn_cv.acquire()
>>        print 'global watcher called'
>>        self.connected = True
>>        self.conn_cv.notifyAll()
>>        self.conn_cv.release()
>>
>>    def master_watcher(self,handle,event,state,path):
>>        self.conn_cv.acquire()
>>        print 'master watcher called'
>>        master = zk.get(self.handle,path,self.master_watcher)[0]
>>        self.master = master
>>        print 'Master is %s' %(master)
>>        self.conn_cv.notifyAll()
>>        self.conn_cv.release()
>>
>>    def errors_watcher(self,handle,event,state,path):
>>        self.conn_cv.acquire()
>>        print 'error watcher called'
>>        errors =
>>len(zk.get_children(self.handle,'/errors',self.errors_watcher))
>>        print 'Current errors %d' %(errors)
>>        if errors > 5 :
>>            print 'Set new master, update znode /master'
>>            zk.set(self.handle,'/master','127.0.0.1:6380')
>>            #self.conn_cv.wait() <-- Why doesn't this return??
>>        self.conn_cv.notifyAll()
>>        self.conn_cv.release()
>>
>>
>>    def create_znodes(self):
>>        self.conn_cv.acquire()
>>        master = zk.exists(self.handle,'/master',self.master_watcher)
>>        if not master:
>>            print 'Creating znode /master'
>>            zk.create(self.handle,'/master','127.0.0.1:6379',
>>                      [ZOO_OPEN_ACL_UNSAFE])
>>        else :
>>            print 'Updating znode /master'
>>            
>>zk.set(self.handle,'/master','127.0.0.1:6379',master['version'])
>>        self.conn_cv.wait() # wait until master_watcher has updated