Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Zookeeper >> mail # user >> Watch not sent immediately?


+
guru singh 2012-05-04, 11:06
+
Patrick Hunt 2012-05-09, 20:23
Copy link to this message
-
Re: Watch not sent immediately?
Interesting - this issue has come up several times with Curator users. I
ended up writing a Tech Note on it.

https://github.com/Netflix/curator/wiki/Tech-Note-1
-JZ

On 5/9/12 1:23 PM, "Patrick Hunt" <[EMAIL PROTECTED]> wrote:

>I believe the issue is that there is a single thread updating
>watchers. If you block that thread then the event can't be delivered.
>
>Patrick
>
>On Fri, May 4, 2012 at 4:06 AM, guru singh <[EMAIL PROTECTED]> wrote:
>> Hi,
>>
>> Sorry if the subject is not appropriately titled.
>>
>> I'm trying to implement a redis-failover solution using zookeeper.
>> I've been working with the python binding for zk
>> Basically, I have a znode called /master, a watch is set on this so
>> that, whenever master changes, self.master is upated
>> There is another znode called /errors, a watch is set on this via
>> get_children to errors_watcher function.
>> My code is supposed to continuously loop and create a childe znode on
>> /errors, whenever an error is detected.
>> The function errors_watcher, counts the number of children for znode
>> /errors, if it exceeds a certain length, it writes a new master
>> 'ip:port' to the znode /master, this calls the master watcher and
>> updates self.master. I use python's threading.Condition() to block for
>> certain operations, for instance initially when znode /master is
>> created, I wait() for master_watcher to be called which updates
>> self.master and releases the lock. This works as expected, however the
>> problem is that when znode /master is changed from within
>> errors_watcher, if I wait() for master_watcher to be called, updating
>> self.master and then releasing the lock. The code just keeps waiting,
>> the master_watcher is never called. However, if I don't wait after
>> setting znode /master from within errors_watcher, master_watcher is
>> called and it updates self.master.
>>
>> It'll be really helpful if somebody could point out what's wrong? Is
>> it zk or is my understanding of threading.Condition() incorrect?
>> Or both :)
>> Thanks for your help
>>
>> This code snippet below, simulates the problem.
>>
>> class ZKtest:
>>
>>    def __init__(self,zk_server):
>>        zk.set_log_stream(open('zk.log','w'))
>>        self.master = None
>>        self.zk_server = zk_server
>>        self.connected = False
>>        self.conn_cv = threading.Condition()
>>
>>    def global_watcher(self,handle,event,state,path):
>>        self.conn_cv.acquire()
>>        print 'global watcher called'
>>        self.connected = True
>>        self.conn_cv.notifyAll()
>>        self.conn_cv.release()
>>
>>    def master_watcher(self,handle,event,state,path):
>>        self.conn_cv.acquire()
>>        print 'master watcher called'
>>        master = zk.get(self.handle,path,self.master_watcher)[0]
>>        self.master = master
>>        print 'Master is %s' %(master)
>>        self.conn_cv.notifyAll()
>>        self.conn_cv.release()
>>
>>    def errors_watcher(self,handle,event,state,path):
>>        self.conn_cv.acquire()
>>        print 'error watcher called'
>>        errors =
>>len(zk.get_children(self.handle,'/errors',self.errors_watcher))
>>        print 'Current errors %d' %(errors)
>>        if errors > 5 :
>>            print 'Set new master, update znode /master'
>>            zk.set(self.handle,'/master','127.0.0.1:6380')
>>            #self.conn_cv.wait() <-- Why doesn't this return??
>>        self.conn_cv.notifyAll()
>>        self.conn_cv.release()
>>
>>
>>    def create_znodes(self):
>>        self.conn_cv.acquire()
>>        master = zk.exists(self.handle,'/master',self.master_watcher)
>>        if not master:
>>            print 'Creating znode /master'
>>            zk.create(self.handle,'/master','127.0.0.1:6379',
>>                      [ZOO_OPEN_ACL_UNSAFE])
>>        else :
>>            print 'Updating znode /master'
>>            
>>zk.set(self.handle,'/master','127.0.0.1:6379',master['version'])
>>        self.conn_cv.wait() # wait until master_watcher has updated
+
Patrick Hunt 2012-05-09, 20:42
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB