|
|
-
Using ZK for real-time group membership notification
Otis Gospodnetic 2011-03-18, 21:02
Hi, Short version: How can ZK be used to make sure that all group members know *the exact number of members at all times*? I have an app that can be run on 1 or more servers. New instances of the app come and go, may die, etc. -- the number of the app instances is completely dynamic. At any one time, as these apps come and go, each live instance of the app needs to know how many instances are there total. If a new instance of the app is started, all instances need to know the new total number of instances. If an app is stopped or if it dies, the remaining apps need to know the new number of app instances. Also, and this is critical, they need to know about these additions/removals of apps right away and they all need to find out them at the same time. Basically, all members of some group need to know *the exact number of members at all times*. This sounds almost like we need to watch a "parent group znode" and monitor the number of its ephemeral children, which represent each app instance that is watching the "parent groups znode". Is that right? If so, then all I'd need to know is the answer to "How many watchers are watching this znode?" of "How many kids does this znode have?". And I'd need ZK to notify all watchers whenever the answer to this question changes. Ideally it would send/push the answer (the number of watchers) to all watchers, but if not, I assume any watcher that is notified about the change would go poll ZK to get the number of ephemeral kids. I think the above is essentially what's described on http://eng.wealthfront.com/2010/01/actually-implementing-group-management.html , but doesn't answer the part that's critical for me (the very first Q up above). Thanks, Otis ---- Sematext :: http://sematext.com/ :: Solr - Lucene - Nutch Lucene ecosystem search :: http://search-lucene.com/
-
Re: Using ZK for real-time group membership notification
Benjamin Reed 2011-03-18, 21:59
in a distributed setting such an answer is impossible. especially given the theory of relativity and the speed of light. a machine may fail right after sending a heart beat or another may come online right after sending a report. even if zookeeper could provide this you would still have thread scheduling issues on a local machine that means that you are operating on old information. to deal with this applications can use views, which allow clients to reconcile differences. for example, if two processes communicate and one has a different list of members than the other then they can both consult zookeeper to reconcile or use the membership list with the highest zxid. the other option is to count on eventually everyone converging. i would not develop a distributed system with the assumption that "all group members know *the exact number of members at all times*". ben On Fri, Mar 18, 2011 at 2:02 PM, Otis Gospodnetic <[EMAIL PROTECTED]> wrote: > Hi, > > Short version: > How can ZK be used to make sure that all group members know *the exact number of > members at all times*? > > I have an app that can be run on 1 or more servers. New instances of the app > come and go, may die, etc. -- the number of the app instances is completely > dynamic. At any one time, as these apps come and go, each live instance of the > app needs to know how many instances are there total. If a new instance of the > app is started, all instances need to know the new total number of instances. > If an app is stopped or if it dies, the remaining apps need to know the new > number of app instances. > > Also, and this is critical, they need to know about these additions/removals of > apps right away and they all need to find out them at the same time. Basically, > all members of some group need to know *the exact number of members at all > times*. > > This sounds almost like we need to watch a "parent group znode" and monitor the > number of its ephemeral children, which represent each app instance that is > watching the "parent groups znode". Is that right? If so, then all I'd need to > know is the answer to "How many watchers are watching this znode?" of "How many > kids does this znode have?". And I'd need ZK to notify all watchers whenever the > answer to this question changes. Ideally it would send/push the answer (the > number of watchers) to all watchers, but if not, I assume any watcher that is > notified about the change would go poll ZK to get the number of ephemeral kids. > > I think the above is essentially what's described on > http://eng.wealthfront.com/2010/01/actually-implementing-group-management.html , > but doesn't answer the part that's critical for me (the very first Q up above). > > Thanks, > Otis > ---- > Sematext :: http://sematext.com/ :: Solr - Lucene - Nutch > Lucene ecosystem search :: http://search-lucene.com/> >
-
Re: Using ZK for real-time group membership notification
Ted Dunning 2011-03-18, 22:51
For a very close to correct answer, take a look at KeptCollections. On Fri, Mar 18, 2011 at 2:02 PM, Otis Gospodnetic < [EMAIL PROTECTED]> wrote: > Hi, > > Short version: > How can ZK be used to make sure that all group members know *the exact > number of > members at all times*? > > I have an app that can be run on 1 or more servers. New instances of the > app > come and go, may die, etc. -- the number of the app instances is completely > dynamic. At any one time, as these apps come and go, each live instance of > the > app needs to know how many instances are there total. If a new instance of > the > app is started, all instances need to know the new total number of > instances. > If an app is stopped or if it dies, the remaining apps need to know the new > number of app instances. > > Also, and this is critical, they need to know about these > additions/removals of > apps right away and they all need to find out them at the same time. > Basically, > all members of some group need to know *the exact number of members at all > times*. > > This sounds almost like we need to watch a "parent group znode" and monitor > the > number of its ephemeral children, which represent each app instance that is > watching the "parent groups znode". Is that right? If so, then all I'd > need to > know is the answer to "How many watchers are watching this znode?" of "How > many > kids does this znode have?". And I'd need ZK to notify all watchers > whenever the > answer to this question changes. Ideally it would send/push the answer > (the > number of watchers) to all watchers, but if not, I assume any watcher that > is > notified about the change would go poll ZK to get the number of ephemeral > kids. > > I think the above is essentially what's described on > > http://eng.wealthfront.com/2010/01/actually-implementing-group-management.html, > but doesn't answer the part that's critical for me (the very first Q up > above). > > Thanks, > Otis > ---- > Sematext :: http://sematext.com/ :: Solr - Lucene - Nutch > Lucene ecosystem search :: http://search-lucene.com/> >
-
Re: Using ZK for real-time group membership notification
Otis Gospodnetic 2011-03-19, 04:41
Hi, Thanks Ben. Let me describe the context a bit more - once you know what I'm trying to do you may have suggestions for how to solve the problem with or without ZK. I have a continuous "stream" of documents that I need to process. The stream is pretty fast (don't know the exact number, but it's many docs a second). Docs live only in-memory in that stream and cannot be saved to disk at any point up the stream. My app listens to this stream. Because of the high document ingestion rate I need N instances of my app to listen to this stream. So all N apps listen and they all "get" the same documents, but only 1 app actually processes each document -- "if (docID mod N == appID) then process doc" -- the usual consistent hashing stuff. I'd like to be able to add and remove apps dynamically and have the remaining apps realize that "N" has changed. Similarly, when some app instance dies and thus "N" changes, I'd like all the remaining instances to know about it. If my apps don't know the correct "N" then 1/Nth of docs will go unprocessed (if the app died or was removed) until the remaining apps adjust their local value of "N". > to deal with this applications can use views, which allow clients to > reconcile differences. for example, if two processes communicate and Hm, this requires apps to communicate with each other. If each app was aware of other apps, then I could get the membership count directly using that mechanism, although I still wouldn't be able to immediately detect when some app died, at least I'm not sure how I could do that. > one has a different list of members than the other then they can both > consult zookeeper to reconcile or use the membership list with the > highest zxid. the other option is to count on eventually everyone > converging. Right, if I could live with eventually having the right "N", then I could use ZK as described on http://eng.wealthfront.com/2010/01/actually-implementing-group-management.htmlBut say I'm OK with "eventually everyone converging". Can I use ZK then? And if so, how "eventually" is this "eventually"? That is, if an app dies, how quickly can ZK notify all znode watchers that znode change? A few milliseconds or more? In general, how does one deal with situations like the one I described above, where each app is responsible for 1/Nth of work and where N can uncontrollably and unexpectedly change? Thanks! Otis ----- Original Message ---- > From: Benjamin Reed <[EMAIL PROTECTED]> > To: [EMAIL PROTECTED] > Sent: Fri, March 18, 2011 5:59:43 PM > Subject: Re: Using ZK for real-time group membership notification > > in a distributed setting such an answer is impossible. especially > given the theory of relativity and the speed of light. a machine may > fail right after sending a heart beat or another may come online right > after sending a report. even if zookeeper could provide this you would > still have thread scheduling issues on a local machine that means that > you are operating on old information. > > to deal with this applications can use views, which allow clients to > reconcile differences. for example, if two processes communicate and > one has a different list of members than the other then they can both > consult zookeeper to reconcile or use the membership list with the > highest zxid. the other option is to count on eventually everyone > converging. > > i would not develop a distributed system with the assumption that "all > group members know *the exact number of members at all times*". > > ben > > On Fri, Mar 18, 2011 at 2:02 PM, Otis Gospodnetic > <[EMAIL PROTECTED]> wrote: > > Hi, > > > > Short version: > > How can ZK be used to make sure that all group members know *the exact >number of > > members at all times*? > > > > I have an app that can be run on 1 or more servers. New instances of the >app > > come and go, may die, etc. -- the number of the app instances is completely (the
-
Re: Using ZK for real-time group membership notification
Michi Mutsuzaki 2011-03-19, 05:05
Hi Otis, Would it be possible to change your app to use producer-consumer queue? That way, no document will go unprocessed when an instance goes down. http://zookeeper.apache.org/doc/r3.3.2/zookeeperTutorial.html#sc_producerConsumerQueues--Michi On Mar 18, 2011, at 9:41 PM, Otis Gospodnetic wrote: > Hi, > > Thanks Ben. Let me describe the context a bit more - once you know what I'm > trying to do you may have suggestions for how to solve the problem with or > without ZK. > > I have a continuous "stream" of documents that I need to process. The stream is > pretty fast (don't know the exact number, but it's many docs a second). Docs > live only in-memory in that stream and cannot be saved to disk at any point up > the stream. > > My app listens to this stream. Because of the high document ingestion rate I > need N instances of my app to listen to this stream. So all N apps listen and > they all "get" the same documents, but only 1 app actually processes each > document -- "if (docID mod N == appID) then process doc" -- the usual consistent > hashing stuff. I'd like to be able to add and remove apps dynamically and have > the remaining apps realize that "N" has changed. Similarly, when some app > instance dies and thus "N" changes, I'd like all the remaining instances to know > about it. > > If my apps don't know the correct "N" then 1/Nth of docs will go unprocessed (if > the app died or was removed) until the remaining apps adjust their local value > of "N". > >> to deal with this applications can use views, which allow clients to >> reconcile differences. for example, if two processes communicate and > > Hm, this requires apps to communicate with each other. If each app was aware of > other apps, then I could get the membership count directly using that mechanism, > although I still wouldn't be able to immediately detect when some app died, at > least I'm not sure how I could do that. > >> one has a different list of members than the other then they can both >> consult zookeeper to reconcile or use the membership list with the >> highest zxid. the other option is to count on eventually everyone >> converging. > > Right, if I could live with eventually having the right "N", then I could use ZK > as described on > http://eng.wealthfront.com/2010/01/actually-implementing-group-management.html> > But say I'm OK with "eventually everyone converging". Can I use ZK then? And > if so, how "eventually" is this "eventually"? That is, if an app dies, how > quickly can ZK notify all znode watchers that znode change? A few milliseconds > or more? > > In general, how does one deal with situations like the one I described above, > where each app is responsible for 1/Nth of work and where N can uncontrollably > and unexpectedly change? > > Thanks! > Otis > > > > > > ----- Original Message ---- >> From: Benjamin Reed <[EMAIL PROTECTED]> >> To: [EMAIL PROTECTED] >> Sent: Fri, March 18, 2011 5:59:43 PM >> Subject: Re: Using ZK for real-time group membership notification >> >> in a distributed setting such an answer is impossible. especially >> given the theory of relativity and the speed of light. a machine may >> fail right after sending a heart beat or another may come online right >> after sending a report. even if zookeeper could provide this you would >> still have thread scheduling issues on a local machine that means that >> you are operating on old information. >> >> to deal with this applications can use views, which allow clients to >> reconcile differences. for example, if two processes communicate and >> one has a different list of members than the other then they can both >> consult zookeeper to reconcile or use the membership list with the >> highest zxid. the other option is to count on eventually everyone >> converging. >> >> i would not develop a distributed system with the assumption that "all >> group members know *the exact number of members at all times*".
-
Re: Using ZK for real-time group membership notification
Mathias Herberts 2011-03-19, 06:08
Maybe the S4 (s4.io) approach could prove helpful.
-
Re: Using ZK for real-time group membership notification
Ted Dunning 2011-03-19, 15:55
I would recommend using KeptCollections to get a close estimate of the currently live machines.
When you need to process a document, select a processor at random from the collection. Then if that processor manages to tell you that the document has been accepted and successfully processed, you can move to the next document. If you don't get confirmation for any reason (timeout, connection loss, whatever), you pick another random processor from the collection (using the kept collection so that you know who is live) and also add the misbehaving processor to a local or global temporary black-list.
If nothing ever goes down, this does a pretty much perfect job. Adding a node also works perfectly. Taking a node out in an orderly way can also work seamless if the node first removes itself from the collection of live processors and then finishes all pending requests plus waits a few seconds to see if any more requests come through.
In the presence you have a few (inevitable) problems. For instance, a node can fail in the tiny window between seeing that the node exists and sending a request. That shouldn't be much of a since the connect will fail. Another mild problem is that a node may fail in such a way that ZK takes 30 seconds or so to be sure that it is gone. Again, connection failure will let this be handled gracefully.
The only serious error here is when you send a document to a processor which successfully handles the document, commits it downstream but then fails before reporting the success to you. This will lead to double commit of the processed document.
Without a reliable central repository, it is impossible to avoid some error like this.
On Fri, Mar 18, 2011 at 9:41 PM, Otis Gospodnetic < [EMAIL PROTECTED]> wrote:
> My app listens to this stream. Because of the high document ingestion rate > I > need N instances of my app to listen to this stream. So all N apps listen > and > they all "get" the same documents, but only 1 app actually processes each > document -- "if (docID mod N == appID) then process doc" -- the usual > consistent > hashing stuff. I'd like to be able to add and remove apps dynamically and > have > the remaining apps realize that "N" has changed. Similarly, when some app > instance dies and thus "N" changes, I'd like all the remaining instances to > know > about it. > > If my apps don't know the correct "N" then 1/Nth of docs will go > unprocessed (if > the app died or was removed) until the remaining apps adjust their local > value > of "N". >
-
Re: Using ZK for real-time group membership notification
Ted Dunning 2011-03-19, 15:56
S4 is usually designed as a best-effort system that doesn't provide any guarantees. Loss of a PE can cause loss of data.
On Fri, Mar 18, 2011 at 11:08 PM, Mathias Herberts < [EMAIL PROTECTED]> wrote:
> Maybe the S4 (s4.io) approach could prove helpful. >
-
Re: Using ZK for real-time group membership notification
Ted Dunning 2011-03-19, 15:57
I have heard pretty good words about Rabbit MQ as well. Message queues definitely are a great way to go. They do have limits with regard to double processing. On Fri, Mar 18, 2011 at 10:05 PM, Michi Mutsuzaki <[EMAIL PROTECTED]>wrote: > Hi Otis, > > Would it be possible to change your app to use producer-consumer queue? > That way, > no document will go unprocessed when an instance goes down. > > > http://zookeeper.apache.org/doc/r3.3.2/zookeeperTutorial.html#sc_producerConsumerQueues> > --Michi > > On Mar 18, 2011, at 9:41 PM, Otis Gospodnetic wrote: > > > Hi, > > > > Thanks Ben. Let me describe the context a bit more - once you know what > I'm > > trying to do you may have suggestions for how to solve the problem with > or > > without ZK. > > > > I have a continuous "stream" of documents that I need to process. The > stream is > > pretty fast (don't know the exact number, but it's many docs a second). > Docs > > live only in-memory in that stream and cannot be saved to disk at any > point up > > the stream. > > > > My app listens to this stream. Because of the high document ingestion > rate I > > need N instances of my app to listen to this stream. So all N apps > listen and > > they all "get" the same documents, but only 1 app actually processes each > > document -- "if (docID mod N == appID) then process doc" -- the usual > consistent > > hashing stuff. I'd like to be able to add and remove apps dynamically > and have > > the remaining apps realize that "N" has changed. Similarly, when some > app > > instance dies and thus "N" changes, I'd like all the remaining instances > to know > > about it. > > > > If my apps don't know the correct "N" then 1/Nth of docs will go > unprocessed (if > > the app died or was removed) until the remaining apps adjust their local > value > > of "N". > > > >> to deal with this applications can use views, which allow clients to > >> reconcile differences. for example, if two processes communicate and > > > > Hm, this requires apps to communicate with each other. If each app was > aware of > > other apps, then I could get the membership count directly using that > mechanism, > > although I still wouldn't be able to immediately detect when some app > died, at > > least I'm not sure how I could do that. > > > >> one has a different list of members than the other then they can both > >> consult zookeeper to reconcile or use the membership list with the > >> highest zxid. the other option is to count on eventually everyone > >> converging. > > > > Right, if I could live with eventually having the right "N", then I could > use ZK > > as described on > > > http://eng.wealthfront.com/2010/01/actually-implementing-group-management.html> > > > But say I'm OK with "eventually everyone converging". Can I use ZK then? > And > > if so, how "eventually" is this "eventually"? That is, if an app dies, > how > > quickly can ZK notify all znode watchers that znode change? A few > milliseconds > > or more? > > > > In general, how does one deal with situations like the one I described > above, > > where each app is responsible for 1/Nth of work and where N can > uncontrollably > > and unexpectedly change? > > > > Thanks! > > Otis > > > > > > > > > > > > ----- Original Message ---- > >> From: Benjamin Reed <[EMAIL PROTECTED]> > >> To: [EMAIL PROTECTED] > >> Sent: Fri, March 18, 2011 5:59:43 PM > >> Subject: Re: Using ZK for real-time group membership notification > >> > >> in a distributed setting such an answer is impossible. especially > >> given the theory of relativity and the speed of light. a machine may > >> fail right after sending a heart beat or another may come online right > >> after sending a report. even if zookeeper could provide this you would > >> still have thread scheduling issues on a local machine that means that > >> you are operating on old information. > >> > >> to deal with this applications can use views, which allow clients to > >> reconcile differences. for example, if two processes communicate and
-
RE: Using ZK for real-time group membership notification
Fournier, Camille F. [Tec... 2011-03-21, 02:38
To do this right you probably need messaging queues. I'd research the various MQ solutions out there. They are built to handle exactly this sort of issue. You could try to implement it via a ZK distributed queue plus some sort of crazy transaction logic in each process (so that a document to process would return to the queue if the processor somehow didn't commit the transaction before dying etc), but it sounds frankly like a nightmare, even if you're comfortable with occasionally just losing documents. > But say I'm OK with "eventually everyone converging". Can I use ZK then? And > if so, how "eventually" is this "eventually"? That is, if an app dies, how > quickly can ZK notify all znode watchers that znode change? A few milliseconds > or more? If your app dies, it will live on in ZK until the session timeout is reached (that is usually in the seconds magnitude). Unless we implement better session death detection logic. C -----Original Message----- From: Otis Gospodnetic [mailto:[EMAIL PROTECTED]] Sent: Saturday, March 19, 2011 12:41 AM To: [EMAIL PROTECTED] Subject: Re: Using ZK for real-time group membership notification Hi, Thanks Ben. Let me describe the context a bit more - once you know what I'm trying to do you may have suggestions for how to solve the problem with or without ZK. I have a continuous "stream" of documents that I need to process. The stream is pretty fast (don't know the exact number, but it's many docs a second). Docs live only in-memory in that stream and cannot be saved to disk at any point up the stream. My app listens to this stream. Because of the high document ingestion rate I need N instances of my app to listen to this stream. So all N apps listen and they all "get" the same documents, but only 1 app actually processes each document -- "if (docID mod N == appID) then process doc" -- the usual consistent hashing stuff. I'd like to be able to add and remove apps dynamically and have the remaining apps realize that "N" has changed. Similarly, when some app instance dies and thus "N" changes, I'd like all the remaining instances to know about it. If my apps don't know the correct "N" then 1/Nth of docs will go unprocessed (if the app died or was removed) until the remaining apps adjust their local value of "N". > to deal with this applications can use views, which allow clients to > reconcile differences. for example, if two processes communicate and Hm, this requires apps to communicate with each other. If each app was aware of other apps, then I could get the membership count directly using that mechanism, although I still wouldn't be able to immediately detect when some app died, at least I'm not sure how I could do that. > one has a different list of members than the other then they can both > consult zookeeper to reconcile or use the membership list with the > highest zxid. the other option is to count on eventually everyone > converging. Right, if I could live with eventually having the right "N", then I could use ZK as described on http://eng.wealthfront.com/2010/01/actually-implementing-group-management.htmlBut say I'm OK with "eventually everyone converging". Can I use ZK then? And if so, how "eventually" is this "eventually"? That is, if an app dies, how quickly can ZK notify all znode watchers that znode change? A few milliseconds or more? In general, how does one deal with situations like the one I described above, where each app is responsible for 1/Nth of work and where N can uncontrollably and unexpectedly change? Thanks! Otis ----- Original Message ---- > From: Benjamin Reed <[EMAIL PROTECTED]> > To: [EMAIL PROTECTED] > Sent: Fri, March 18, 2011 5:59:43 PM > Subject: Re: Using ZK for real-time group membership notification > > in a distributed setting such an answer is impossible. especially > given the theory of relativity and the speed of light. a machine may > fail right after sending a heart beat or another may come online right (the
|
|