Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Purgatory


Copy link to this message
-
Re: Purgatory
Marc, thanks for writing that up. I think it is worth adding some
details on the request-purgatory on a wiki (Jay had started a wiki
page for kafka internals [1] a while ago, but we have not had time to
add much to it since.) Your write-up could be reviewed and added
there. Do you have edit permissions on the wiki?

As for the purge interval config - yes the documentation can be
improved a bit. It's one of those "internal" configs that generally
don't need to be modified by users. The reason we added that was as
follows:
- We found that for low-volume topics, replica fetch requests were
getting expired but sitting around in purgatory
- This was because we were expiring them from the delay queue (used to
track when requests should expire), but they were still sitting in the
watcherFor map - i.e., they would get purged when the next producer
request to that topic/partition arrived, but for low volume topics
this could be a long time (or never in the worst case) and we would
eventually run into an OOME.
- So we needed to periodically go through the entire watcherFor map
and explicitly remove those requests that had expired.
- More details on this are in KAFKA-664.

Thanks,

Joel

[1] https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals

On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe <[EMAIL PROTECTED]> wrote:
> Guozhang,
>
> I have to agree with Priya the doc isn't very clear. Although the
> configuration is documented, it is simply rewording the name of the config,
> which isn't particularly useful if you want more information about what the
> purgatory is. I searched the whole wiki and doc and could not find anything
> very useful as opposed looking a the code. In this case,
> kafka.server.KafkaApis and kafka.server.RequestPurgatory will be your
> friends.
>
> I'll try to add to Joe's answer here, mostly just reporting what's
> available in the Scala doc from the project. I am doing this to understand
> the mechanics myself btw.
>
> As Joe said, messages are not dropped by the purgatory but simply removed
> from the purgatory when they are satisfied. Satisfaction conditions are
> different for both fetch and produce requests and this is implemented in
> their respective DelayedRequest implementation (DelayedFetch and
> DelayedProduce).
>
> Requests purgatories are defined as follow in the code:
>  - ProducerRequestPurgatory: A holding pen for produce requests waiting to
> be satisfied.
>  - FetchRequestPurgatory: A holding pen for fetch requests waiting to be
> satisfied
>
> Each request purgatory runs a thread (ExpiredRequestReaper). This thread
> will first try to find an expired delayed request. When one if found, it
> will run the purgatory's expire method to handle the delayed request
> expiration. In both produce and fetch cases, it sends a response to the
> client. An expired request will be a satisfied request. The next step of
> the thread's loop is when it checks for the configuration parameters you
> asked for initially (purgatory.purge.interval.requests). When the number of
> delayed requests given to watch by the purgatory reaches this value, it
> goes through all previously queued requests and removes those which are
> marked as satisfied. Because of that, it is really an interval more than it
> is a threshold since it doesn't really care about the amount of satisfied
> requests or the size of the queue.
>
> Producer request
> - When is it added to purgatory (delayed)?:
>   * when it uses ack=-1 (actually, the code tells me anything but 0 or 1);
> Producer config: request.required.acks
>   * partitions have more than one replica (in this case, ack=-1 isn't
> different to ack=1 and it doesn't make much sense to use a delayed request)
>   * not all partitions are in error
> - When does it expire? when it reaches the timeout defined in the produce
> request (ackTimeoutMs). Translates from producer config request.timeout.ms.
> - What happens (on the broker) when it expires? Sends a response to the
> client. Response content depends on the request of course.

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB