|
Tom Brown
2012-10-25, 21:44
Neha Narkhede
2012-10-26, 01:19
Philip O'Toole
2012-10-26, 01:31
Tom Brown
2012-10-26, 02:04
Evan chan
2012-10-26, 02:58
Jun Rao
2012-10-26, 04:15
Tom Brown
2012-10-26, 05:00
Jun Rao
2012-10-26, 14:24
Tom Brown
2012-10-26, 14:29
Jay Kreps
2012-10-26, 18:08
Jay Kreps
2012-10-26, 18:18
Jay Kreps
2012-10-26, 18:29
Jason Rosenberg
2012-10-26, 18:29
Guozhang Wang
2012-10-26, 18:31
Jay Kreps
2012-10-26, 18:47
Jun Rao
2012-10-29, 03:09
Jun Rao
2012-10-29, 05:31
Rohit Prasad
2012-11-02, 22:11
Tom Brown
2012-11-02, 23:12
Rohit Prasad
2012-11-03, 18:51
Milind Parikh
2012-11-03, 19:53
Jun Rao
2013-03-28, 15:23
|
-
Transactional writingTom Brown 2012-10-25, 21:44
Is there an accepted, or recommended way to make writes to a Kafka
queue idempotent, or within a transaction? I can configure my system such that each queue has exactly one producer. (If there are no accepted/recommended ways, I have a few ideas I would like to propose. I would also be willing to implement them if needed) Thanks in advance! --Tom
-
Re: Transactional writingNeha Narkhede 2012-10-26, 01:19
The closest concept of transaction on the publisher side, that I can
think of, is using batch of messages in a single call to the synchronous producer. Precisely, you can configure a Kafka producer to use the "sync" mode and batch messages that require transactional guarantees in a single send() call. That will ensure that either all the messages in the batch are sent or none. Thanks, Neha On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <[EMAIL PROTECTED]> wrote: > Is there an accepted, or recommended way to make writes to a Kafka > queue idempotent, or within a transaction? > > I can configure my system such that each queue has exactly one producer. > > (If there are no accepted/recommended ways, I have a few ideas I would > like to propose. I would also be willing to implement them if needed) > > Thanks in advance! > > --Tom
-
Re: Transactional writingPhilip O'Toole 2012-10-26, 01:31
On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
> The closest concept of transaction on the publisher side, that I can > think of, is using batch of messages in a single call to the > synchronous producer. > > Precisely, you can configure a Kafka producer to use the "sync" mode > and batch messages that require transactional guarantees in a > single send() call. That will ensure that either all the messages in > the batch are sent or none. This is an interesting feature -- something I wasn't aware of. Still it doesn't solve the problem *completely*. As many people realise, it's still possible for the batch of messages to get into Kafka fine, but the ack from Kafka to be lost on its way back to the Producer. In that case the Producer erroneously believes the messages didn't get in, and might re-send them. You guys *haven't* solved that issue, right? I believe you write about it on the Kafka site. > > Thanks, > Neha > > On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <[EMAIL PROTECTED]> wrote: > > Is there an accepted, or recommended way to make writes to a Kafka > > queue idempotent, or within a transaction? > > > > I can configure my system such that each queue has exactly one producer. > > > > (If there are no accepted/recommended ways, I have a few ideas I would > > like to propose. I would also be willing to implement them if needed) > > > > Thanks in advance! > > > > --Tom -- Philip O'Toole Senior Developer Loggly, Inc. San Francisco, Calif. www.loggly.com Come join us! http://loggly.com/company/careers/
-
Re: Transactional writingTom Brown 2012-10-26, 02:04
I have come up with two different possibilities, both with different trade-offs.
The first would be to support "true" transactions by writing transactional data into a temporary file and then copy it directly to the end of the partition when the commit command is created. The upside to this approach is that individual transactions can be larger than a single batch, and more than one producer could conduct transactions at once. The downside is the extra IO involved in writing it and reading it from disk an extra time. The second would be to allow any number of messages to be appended to a topic, but not move the "end of topic" offset until the commit was received. If a rollback was received, or the producer timed out, the partition could be truncated at the most recently recognized "end of topic" offset. The upside is that there is very little extra IO (only to store the official "end of topic" metadata), and it seems like it should be easy to implement. The downside is that this the "transaction" feature is incompatible with anything but a single producer per partition. I am interested in your thoughts on these. --Tom On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole <[EMAIL PROTECTED]> wrote: > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote: >> The closest concept of transaction on the publisher side, that I can >> think of, is using batch of messages in a single call to the >> synchronous producer. >> >> Precisely, you can configure a Kafka producer to use the "sync" mode >> and batch messages that require transactional guarantees in a >> single send() call. That will ensure that either all the messages in >> the batch are sent or none. > > This is an interesting feature -- something I wasn't aware of. Still it > doesn't solve the problem *completely*. As many people realise, it's still > possible for the batch of messages to get into Kafka fine, but the ack from > Kafka to be lost on its way back to the Producer. In that case the Producer > erroneously believes the messages didn't get in, and might re-send them. > > You guys *haven't* solved that issue, right? I believe you write about it on > the Kafka site. > >> >> Thanks, >> Neha >> >> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <[EMAIL PROTECTED]> wrote: >> > Is there an accepted, or recommended way to make writes to a Kafka >> > queue idempotent, or within a transaction? >> > >> > I can configure my system such that each queue has exactly one producer. >> > >> > (If there are no accepted/recommended ways, I have a few ideas I would >> > like to propose. I would also be willing to implement them if needed) >> > >> > Thanks in advance! >> > >> > --Tom > > -- > Philip O'Toole > > Senior Developer > Loggly, Inc. > San Francisco, Calif. > www.loggly.com > > Come join us! > http://loggly.com/company/careers/
-
Re: Transactional writingEvan chan 2012-10-26, 02:58
A third possibility is to use a different storage backend, like Cassandra, which easily can support idem potent writes. You would hash the unique message ID and time stamp into row and column keys.
Note that this scheme would possibly allow using as a priority queue. -Evan Carry your candle, run to the darkness Seek out the helpless, deceived and poor Hold out your candle for all to see it Take your candle, and go light your world On Oct 25, 2012, at 7:04 PM, Tom Brown <[EMAIL PROTECTED]> wrote: > I have come up with two different possibilities, both with different trade-offs. > > The first would be to support "true" transactions by writing > transactional data into a temporary file and then copy it directly to > the end of the partition when the commit command is created. The > upside to this approach is that individual transactions can be larger > than a single batch, and more than one producer could conduct > transactions at once. The downside is the extra IO involved in writing > it and reading it from disk an extra time. > > The second would be to allow any number of messages to be appended to > a topic, but not move the "end of topic" offset until the commit was > received. If a rollback was received, or the producer timed out, the > partition could be truncated at the most recently recognized "end of > topic" offset. The upside is that there is very little extra IO (only > to store the official "end of topic" metadata), and it seems like it > should be easy to implement. The downside is that this the > "transaction" feature is incompatible with anything but a single > producer per partition. > > I am interested in your thoughts on these. > > --Tom > > On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole <[EMAIL PROTECTED]> wrote: >> On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote: >>> The closest concept of transaction on the publisher side, that I can >>> think of, is using batch of messages in a single call to the >>> synchronous producer. >>> >>> Precisely, you can configure a Kafka producer to use the "sync" mode >>> and batch messages that require transactional guarantees in a >>> single send() call. That will ensure that either all the messages in >>> the batch are sent or none. >> >> This is an interesting feature -- something I wasn't aware of. Still it >> doesn't solve the problem *completely*. As many people realise, it's still >> possible for the batch of messages to get into Kafka fine, but the ack from >> Kafka to be lost on its way back to the Producer. In that case the Producer >> erroneously believes the messages didn't get in, and might re-send them. >> >> You guys *haven't* solved that issue, right? I believe you write about it on >> the Kafka site. >> >>> >>> Thanks, >>> Neha >>> >>> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <[EMAIL PROTECTED]> wrote: >>>> Is there an accepted, or recommended way to make writes to a Kafka >>>> queue idempotent, or within a transaction? >>>> >>>> I can configure my system such that each queue has exactly one producer. >>>> >>>> (If there are no accepted/recommended ways, I have a few ideas I would >>>> like to propose. I would also be willing to implement them if needed) >>>> >>>> Thanks in advance! >>>> >>>> --Tom >> >> -- >> Philip O'Toole >> >> Senior Developer >> Loggly, Inc. >> San Francisco, Calif. >> www.loggly.com >> >> Come join us! >> http://loggly.com/company/careers/
-
Re: Transactional writingJun Rao 2012-10-26, 04:15
Even if you have transaction support, the same problem exists. If the
client died before receiving the ack, it's not sure whether the broker really committed the data or not. To address this issue, the client can save the offset of committed messages periodically. On restart from a crash, it first reads all messages after the last saved offset. It then knows whether the last message is committed or not and can decide whether the message should be resent or not. This probably only works for a single producer. Thanks, Jun On Thu, Oct 25, 2012 at 6:31 PM, Philip O'Toole <[EMAIL PROTECTED]> wrote: > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote: > > The closest concept of transaction on the publisher side, that I can > > think of, is using batch of messages in a single call to the > > synchronous producer. > > > > Precisely, you can configure a Kafka producer to use the "sync" mode > > and batch messages that require transactional guarantees in a > > single send() call. That will ensure that either all the messages in > > the batch are sent or none. > > This is an interesting feature -- something I wasn't aware of. Still it > doesn't solve the problem *completely*. As many people realise, it's still > possible for the batch of messages to get into Kafka fine, but the ack from > Kafka to be lost on its way back to the Producer. In that case the Producer > erroneously believes the messages didn't get in, and might re-send them. > > You guys *haven't* solved that issue, right? I believe you write about it > on > the Kafka site. > > > > > Thanks, > > Neha > > > > On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <[EMAIL PROTECTED]> wrote: > > > Is there an accepted, or recommended way to make writes to a Kafka > > > queue idempotent, or within a transaction? > > > > > > I can configure my system such that each queue has exactly one > producer. > > > > > > (If there are no accepted/recommended ways, I have a few ideas I would > > > like to propose. I would also be willing to implement them if needed) > > > > > > Thanks in advance! > > > > > > --Tom > > -- > Philip O'Toole > > Senior Developer > Loggly, Inc. > San Francisco, Calif. > www.loggly.com > > Come join us! > http://loggly.com/company/careers/ >
-
Re: Transactional writingTom Brown 2012-10-26, 05:00
How do other systems deal with that? If I send "commit" to Oracle, but
my connection dies before I get the ack, is the data committed or not? What about the other case? If I send "commit" to Oracle, but the server dies before I get the ack, is the data committed or not? In either case, how can I tell? --Tom On Fri, Oct 26, 2012 at 12:15 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > Even if you have transaction support, the same problem exists. If the > client died before receiving the ack, it's not sure whether the broker > really committed the data or not. > > To address this issue, the client can save the offset of committed messages > periodically. On restart from a crash, it first reads all messages after > the last saved offset. It then knows whether the last message is committed > or not and can decide whether the message should be resent or not. This > probably only works for a single producer. > > Thanks, > > Jun > > On Thu, Oct 25, 2012 at 6:31 PM, Philip O'Toole <[EMAIL PROTECTED]> wrote: > >> On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote: >> > The closest concept of transaction on the publisher side, that I can >> > think of, is using batch of messages in a single call to the >> > synchronous producer. >> > >> > Precisely, you can configure a Kafka producer to use the "sync" mode >> > and batch messages that require transactional guarantees in a >> > single send() call. That will ensure that either all the messages in >> > the batch are sent or none. >> >> This is an interesting feature -- something I wasn't aware of. Still it >> doesn't solve the problem *completely*. As many people realise, it's still >> possible for the batch of messages to get into Kafka fine, but the ack from >> Kafka to be lost on its way back to the Producer. In that case the Producer >> erroneously believes the messages didn't get in, and might re-send them. >> >> You guys *haven't* solved that issue, right? I believe you write about it >> on >> the Kafka site. >> >> > >> > Thanks, >> > Neha >> > >> > On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <[EMAIL PROTECTED]> wrote: >> > > Is there an accepted, or recommended way to make writes to a Kafka >> > > queue idempotent, or within a transaction? >> > > >> > > I can configure my system such that each queue has exactly one >> producer. >> > > >> > > (If there are no accepted/recommended ways, I have a few ideas I would >> > > like to propose. I would also be willing to implement them if needed) >> > > >> > > Thanks in advance! >> > > >> > > --Tom >> >> -- >> Philip O'Toole >> >> Senior Developer >> Loggly, Inc. >> San Francisco, Calif. >> www.loggly.com >> >> Come join us! >> http://loggly.com/company/careers/ >>
-
Re: Transactional writingJun Rao 2012-10-26, 14:24
In a database, if you know the key of the last record that you are writing,
you can do a read to see if your last change is actually made. However, if you use auto-generated keys by the server and are doing an insert, you don't even know the key on failure. So, in general, this is an unsolved problem and needs to be handled at the application level. Most applications will just do resend and deal with the consequence of potential duplicates. Thanks, Jun On Thu, Oct 25, 2012 at 10:00 PM, Tom Brown <[EMAIL PROTECTED]> wrote: > How do other systems deal with that? If I send "commit" to Oracle, but > my connection dies before I get the ack, is the data committed or not? > > What about the other case? If I send "commit" to Oracle, but the > server dies before I get the ack, is the data committed or not? > > In either case, how can I tell? > > --Tom > > On Fri, Oct 26, 2012 at 12:15 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > > Even if you have transaction support, the same problem exists. If the > > client died before receiving the ack, it's not sure whether the broker > > really committed the data or not. > > > > To address this issue, the client can save the offset of committed > messages > > periodically. On restart from a crash, it first reads all messages after > > the last saved offset. It then knows whether the last message is > committed > > or not and can decide whether the message should be resent or not. This > > probably only works for a single producer. > > > > Thanks, > > > > Jun > > > > On Thu, Oct 25, 2012 at 6:31 PM, Philip O'Toole <[EMAIL PROTECTED]> > wrote: > > > >> On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote: > >> > The closest concept of transaction on the publisher side, that I can > >> > think of, is using batch of messages in a single call to the > >> > synchronous producer. > >> > > >> > Precisely, you can configure a Kafka producer to use the "sync" mode > >> > and batch messages that require transactional guarantees in a > >> > single send() call. That will ensure that either all the messages in > >> > the batch are sent or none. > >> > >> This is an interesting feature -- something I wasn't aware of. Still it > >> doesn't solve the problem *completely*. As many people realise, it's > still > >> possible for the batch of messages to get into Kafka fine, but the ack > from > >> Kafka to be lost on its way back to the Producer. In that case the > Producer > >> erroneously believes the messages didn't get in, and might re-send them. > >> > >> You guys *haven't* solved that issue, right? I believe you write about > it > >> on > >> the Kafka site. > >> > >> > > >> > Thanks, > >> > Neha > >> > > >> > On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <[EMAIL PROTECTED]> > wrote: > >> > > Is there an accepted, or recommended way to make writes to a Kafka > >> > > queue idempotent, or within a transaction? > >> > > > >> > > I can configure my system such that each queue has exactly one > >> producer. > >> > > > >> > > (If there are no accepted/recommended ways, I have a few ideas I > would > >> > > like to propose. I would also be willing to implement them if > needed) > >> > > > >> > > Thanks in advance! > >> > > > >> > > --Tom > >> > >> -- > >> Philip O'Toole > >> > >> Senior Developer > >> Loggly, Inc. > >> San Francisco, Calif. > >> www.loggly.com > >> > >> Come join us! > >> http://loggly.com/company/careers/ > >> >
-
Re: Transactional writingTom Brown 2012-10-26, 14:29
That's was exactly the conclusion I arrived at, which is why I don't
want to worry too much about that case. Back to the original two proposals-- If I'm going to attempt to implement one or the other, do any of you who are familiar with the code want to offer advice as to which is preferred (or if no preference, which would be easier to implement)? --Tom On Fri, Oct 26, 2012 at 10:24 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > In a database, if you know the key of the last record that you are writing, > you can do a read to see if your last change is actually made. However, if > you use auto-generated keys by the server and are doing an insert, you > don't even know the key on failure. So, in general, this is an unsolved > problem and needs to be handled at the application level. Most applications > will just do resend and deal with the consequence of potential duplicates. > > Thanks, > > Jun > > On Thu, Oct 25, 2012 at 10:00 PM, Tom Brown <[EMAIL PROTECTED]> wrote: > >> How do other systems deal with that? If I send "commit" to Oracle, but >> my connection dies before I get the ack, is the data committed or not? >> >> What about the other case? If I send "commit" to Oracle, but the >> server dies before I get the ack, is the data committed or not? >> >> In either case, how can I tell? >> >> --Tom >> >> On Fri, Oct 26, 2012 at 12:15 AM, Jun Rao <[EMAIL PROTECTED]> wrote: >> > Even if you have transaction support, the same problem exists. If the >> > client died before receiving the ack, it's not sure whether the broker >> > really committed the data or not. >> > >> > To address this issue, the client can save the offset of committed >> messages >> > periodically. On restart from a crash, it first reads all messages after >> > the last saved offset. It then knows whether the last message is >> committed >> > or not and can decide whether the message should be resent or not. This >> > probably only works for a single producer. >> > >> > Thanks, >> > >> > Jun >> > >> > On Thu, Oct 25, 2012 at 6:31 PM, Philip O'Toole <[EMAIL PROTECTED]> >> wrote: >> > >> >> On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote: >> >> > The closest concept of transaction on the publisher side, that I can >> >> > think of, is using batch of messages in a single call to the >> >> > synchronous producer. >> >> > >> >> > Precisely, you can configure a Kafka producer to use the "sync" mode >> >> > and batch messages that require transactional guarantees in a >> >> > single send() call. That will ensure that either all the messages in >> >> > the batch are sent or none. >> >> >> >> This is an interesting feature -- something I wasn't aware of. Still it >> >> doesn't solve the problem *completely*. As many people realise, it's >> still >> >> possible for the batch of messages to get into Kafka fine, but the ack >> from >> >> Kafka to be lost on its way back to the Producer. In that case the >> Producer >> >> erroneously believes the messages didn't get in, and might re-send them. >> >> >> >> You guys *haven't* solved that issue, right? I believe you write about >> it >> >> on >> >> the Kafka site. >> >> >> >> > >> >> > Thanks, >> >> > Neha >> >> > >> >> > On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <[EMAIL PROTECTED]> >> wrote: >> >> > > Is there an accepted, or recommended way to make writes to a Kafka >> >> > > queue idempotent, or within a transaction? >> >> > > >> >> > > I can configure my system such that each queue has exactly one >> >> producer. >> >> > > >> >> > > (If there are no accepted/recommended ways, I have a few ideas I >> would >> >> > > like to propose. I would also be willing to implement them if >> needed) >> >> > > >> >> > > Thanks in advance! >> >> > > >> >> > > --Tom >> >> >> >> -- >> >> Philip O'Toole >> >> >> >> Senior Developer >> >> Loggly, Inc. >> >> San Francisco, Calif. >> >> www.loggly.com >> >> >> >> Come join us! >> >> http://loggly.com/company/careers/ >> >> >>
-
Re: Transactional writingJay Kreps 2012-10-26, 18:08
This is an important feature and I am interested in helping out in the
design and implementation, though I am working on 0.8 features for the next month so I may not be of too much use. I have thought a little bit about this, but I am not yet sure of the best approach. Here is a specific use case I think is important to address: consider a case where you are doing processing of one or more streams and producing an output stream. This processing may involve some kind of local state (say counters or other local aggregation intermediate state). This is a common scenario. The problem is to give reasonable semantics to this computation in the presence of failures. The processor effectively has a position/offset in each of its input streams as well as whatever local state. The problem is that if this process fails it needs to restore to a state that matches the last produced messages. There are several solutions to this problem. One is to make the output somehow idempotent, this will solve some cases but is not a general solution as many things cannot be made idempotent easily. I think the two proposals you give outline a couple of basic approaches: 1. Store the messages on the server somewhere but don't add them to the log until the commit call 2. Store the messages in the log but don't make them available to the consumer until the commit call Another option you didn't mention: I can give several subtleties to these approaches. One advantage of the second approach is that messages are in the log and can be available for reading or not. This makes it possible to support a kind of "dirty read" that allows the consumer to specify whether they want to immediately see all messages with low latency but potentially see uncommitted messages or only see committed messages. The problem with the second approach at least in the way you describe it is that you have to lock the log until the commit occurs otherwise you can't roll back (because otherwise someone else may have appended their own messages and you can't truncate the log). This would have all the problems of remote locks. I think this might be a deal-breaker. Another variation on the second approach would be the following: have each producer maintain an id and generation number. Keep a schedule of valid offset/id/generation numbers on the broker and only hand these out. This solution would support non-blocking multi-writer appends but requires more participation from the producer (i.e. getting a generation number and id). Cheers, -Jay On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <[EMAIL PROTECTED]> wrote: > I have come up with two different possibilities, both with different > trade-offs. > > The first would be to support "true" transactions by writing > transactional data into a temporary file and then copy it directly to > the end of the partition when the commit command is created. The > upside to this approach is that individual transactions can be larger > than a single batch, and more than one producer could conduct > transactions at once. The downside is the extra IO involved in writing > it and reading it from disk an extra time. > > The second would be to allow any number of messages to be appended to > a topic, but not move the "end of topic" offset until the commit was > received. If a rollback was received, or the producer timed out, the > partition could be truncated at the most recently recognized "end of > topic" offset. The upside is that there is very little extra IO (only > to store the official "end of topic" metadata), and it seems like it > should be easy to implement. The downside is that this the > "transaction" feature is incompatible with anything but a single > producer per partition. > > I am interested in your thoughts on these. > > --Tom > > On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole <[EMAIL PROTECTED]> wrote: > > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote: > >> The closest concept of transaction on the publisher side, that I can
-
Re: Transactional writingJay Kreps 2012-10-26, 18:18
There are a few oddities of using the batching feature to get a kind of
transactionality (that wasn't the original intention): 1. It only actually works if you enable compression. Currently I don't think we allow uncompressed recursive message batches. Without this the batching only protects against producer failure, in the case of broker failure their is no guarantee (either with or without replication) that you won't fail in the middle of writing the batch. We could consider separating out the batching from the compression support to make this work in a more sane way. 2. It doesn't work across partitions or topics. -Jay On Thu, Oct 25, 2012 at 6:19 PM, Neha Narkhede <[EMAIL PROTECTED]>wrote: > The closest concept of transaction on the publisher side, that I can > think of, is using batch of messages in a single call to the > synchronous producer. > > Precisely, you can configure a Kafka producer to use the "sync" mode > and batch messages that require transactional guarantees in a > single send() call. That will ensure that either all the messages in > the batch are sent or none. > > Thanks, > Neha > > On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <[EMAIL PROTECTED]> wrote: > > Is there an accepted, or recommended way to make writes to a Kafka > > queue idempotent, or within a transaction? > > > > I can configure my system such that each queue has exactly one producer. > > > > (If there are no accepted/recommended ways, I have a few ideas I would > > like to propose. I would also be willing to implement them if needed) > > > > Thanks in advance! > > > > --Tom >
-
Re: Transactional writingJay Kreps 2012-10-26, 18:29
Yes, I don't think any single-hop request/response protocol can avoid this
problem when performing general mutations on a remote system. This includes essentially all common usage of web services, databases, messaging systems, etc. That is to say, if the client gets a socket related error when performing a remote mutation operation the client doesn't know if the mutation occurred or not. There are consensus-type protocols that resolve this kind of problem in the general case by allowing many participants to come into agreement on whether or not the mutation took place, but these have a number of drawbacks as a client-facing protocol and mostly aren't used for this purpose. I think in practice most people rely on application logic (making updates idempotent, deduplicating on the client, etc). -Jay On Fri, Oct 26, 2012 at 7:24 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > In a database, if you know the key of the last record that you are writing, > you can do a read to see if your last change is actually made. However, if > you use auto-generated keys by the server and are doing an insert, you > don't even know the key on failure. So, in general, this is an unsolved > problem and needs to be handled at the application level. Most applications > will just do resend and deal with the consequence of potential duplicates. > > Thanks, > > Jun > > On Thu, Oct 25, 2012 at 10:00 PM, Tom Brown <[EMAIL PROTECTED]> wrote: > > > How do other systems deal with that? If I send "commit" to Oracle, but > > my connection dies before I get the ack, is the data committed or not? > > > > What about the other case? If I send "commit" to Oracle, but the > > server dies before I get the ack, is the data committed or not? > > > > In either case, how can I tell? > > > > --Tom > > > > On Fri, Oct 26, 2012 at 12:15 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > Even if you have transaction support, the same problem exists. If the > > > client died before receiving the ack, it's not sure whether the broker > > > really committed the data or not. > > > > > > To address this issue, the client can save the offset of committed > > messages > > > periodically. On restart from a crash, it first reads all messages > after > > > the last saved offset. It then knows whether the last message is > > committed > > > or not and can decide whether the message should be resent or not. This > > > probably only works for a single producer. > > > > > > Thanks, > > > > > > Jun > > > > > > On Thu, Oct 25, 2012 at 6:31 PM, Philip O'Toole <[EMAIL PROTECTED]> > > wrote: > > > > > >> On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote: > > >> > The closest concept of transaction on the publisher side, that I can > > >> > think of, is using batch of messages in a single call to the > > >> > synchronous producer. > > >> > > > >> > Precisely, you can configure a Kafka producer to use the "sync" mode > > >> > and batch messages that require transactional guarantees in a > > >> > single send() call. That will ensure that either all the messages in > > >> > the batch are sent or none. > > >> > > >> This is an interesting feature -- something I wasn't aware of. Still > it > > >> doesn't solve the problem *completely*. As many people realise, it's > > still > > >> possible for the batch of messages to get into Kafka fine, but the ack > > from > > >> Kafka to be lost on its way back to the Producer. In that case the > > Producer > > >> erroneously believes the messages didn't get in, and might re-send > them. > > >> > > >> You guys *haven't* solved that issue, right? I believe you write about > > it > > >> on > > >> the Kafka site. > > >> > > >> > > > >> > Thanks, > > >> > Neha > > >> > > > >> > On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <[EMAIL PROTECTED]> > > wrote: > > >> > > Is there an accepted, or recommended way to make writes to a Kafka > > >> > > queue idempotent, or within a transaction? > > >> > > > > >> > > I can configure my system such that each queue has exactly one
-
Re: Transactional writingJason Rosenberg 2012-10-26, 18:29
Correct me if I'm wrong, but I thought the intention of kafka was not
really to handle this use case (e.g. transactional writing nor guaranteed delivery semantics). Why wouldn't you use a jms queue system (e.g. activemq) if you need transactional messaging, backed by a database, etc.? Jason On Fri, Oct 26, 2012 at 11:18 AM, Jay Kreps <[EMAIL PROTECTED]> wrote: > There are a few oddities of using the batching feature to get a kind of > transactionality (that wasn't the original intention): > 1. It only actually works if you enable compression. Currently I don't > think we allow uncompressed recursive message batches. Without this the > batching only protects against producer failure, in the case of broker > failure their is no guarantee (either with or without replication) that you > won't fail in the middle of writing the batch. We could > consider separating out the batching from the compression support to make > this work in a more sane way. > 2. It doesn't work across partitions or topics. > > -Jay > > On Thu, Oct 25, 2012 at 6:19 PM, Neha Narkhede <[EMAIL PROTECTED] > >wrote: > > > The closest concept of transaction on the publisher side, that I can > > think of, is using batch of messages in a single call to the > > synchronous producer. > > > > Precisely, you can configure a Kafka producer to use the "sync" mode > > and batch messages that require transactional guarantees in a > > single send() call. That will ensure that either all the messages in > > the batch are sent or none. > > > > Thanks, > > Neha > > > > On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <[EMAIL PROTECTED]> wrote: > > > Is there an accepted, or recommended way to make writes to a Kafka > > > queue idempotent, or within a transaction? > > > > > > I can configure my system such that each queue has exactly one > producer. > > > > > > (If there are no accepted/recommended ways, I have a few ideas I would > > > like to propose. I would also be willing to implement them if needed) > > > > > > Thanks in advance! > > > > > > --Tom > > >
-
RE: Transactional writingGuozhang Wang 2012-10-26, 18:31
I am also quite interested in this thread, and I have another question here to ask about committing consumed messages. For example, if I need a program which acts both as a consumer and a producer, and the actions are wrapped in a "transaction":
Transaction start: Get next message from broker A; Do something; Send a message to broker B; Commit. If the transaction aborts after reading the message from broker A, is it possible to logically "put the message back" to brokers? I remember that Amazon Queue Service use some sort of lease mechanism, which might work for this case. But I am afraid that will affect the throughput a lot.. --Guozhang -----Original Message----- From: Jay Kreps [mailto:[EMAIL PROTECTED]] Sent: Friday, October 26, 2012 2:08 PM To: [EMAIL PROTECTED] Subject: Re: Transactional writing This is an important feature and I am interested in helping out in the design and implementation, though I am working on 0.8 features for the next month so I may not be of too much use. I have thought a little bit about this, but I am not yet sure of the best approach. Here is a specific use case I think is important to address: consider a case where you are doing processing of one or more streams and producing an output stream. This processing may involve some kind of local state (say counters or other local aggregation intermediate state). This is a common scenario. The problem is to give reasonable semantics to this computation in the presence of failures. The processor effectively has a position/offset in each of its input streams as well as whatever local state. The problem is that if this process fails it needs to restore to a state that matches the last produced messages. There are several solutions to this problem. One is to make the output somehow idempotent, this will solve some cases but is not a general solution as many things cannot be made idempotent easily. I think the two proposals you give outline a couple of basic approaches: 1. Store the messages on the server somewhere but don't add them to the log until the commit call 2. Store the messages in the log but don't make them available to the consumer until the commit call Another option you didn't mention: I can give several subtleties to these approaches. One advantage of the second approach is that messages are in the log and can be available for reading or not. This makes it possible to support a kind of "dirty read" that allows the consumer to specify whether they want to immediately see all messages with low latency but potentially see uncommitted messages or only see committed messages. The problem with the second approach at least in the way you describe it is that you have to lock the log until the commit occurs otherwise you can't roll back (because otherwise someone else may have appended their own messages and you can't truncate the log). This would have all the problems of remote locks. I think this might be a deal-breaker. Another variation on the second approach would be the following: have each producer maintain an id and generation number. Keep a schedule of valid offset/id/generation numbers on the broker and only hand these out. This solution would support non-blocking multi-writer appends but requires more participation from the producer (i.e. getting a generation number and id). Cheers, -Jay On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <[EMAIL PROTECTED]> wrote: > I have come up with two different possibilities, both with different > trade-offs. > > The first would be to support "true" transactions by writing > transactional data into a temporary file and then copy it directly to > the end of the partition when the commit command is created. The > upside to this approach is that individual transactions can be larger > than a single batch, and more than one producer could conduct > transactions at once. The downside is the extra IO involved in writing > it and reading it from disk an extra time.
-
Re: Transactional writingJay Kreps 2012-10-26, 18:47
Yeah that is a good question. I think we started with good throughput and
scalability and are then adding in the features we can without breaking these things or over-complicating the design. For example in 0.8 we have * much* better delivery guarantees than most messaging systems (imo). The use case I gave is my motivation for being interested in this. Giving good semantics to incremental processing is an important use case and one that often goes along with high throughput requirements (which is why you wouldn't just use a traditional RDBMS). I think the question here is whether there is an implementation of transactions that works well in our design and doesn't kill performance. -Jay On Fri, Oct 26, 2012 at 11:29 AM, Jason Rosenberg <[EMAIL PROTECTED]> wrote: > Correct me if I'm wrong, but I thought the intention of kafka was not > really to handle this use case (e.g. transactional writing nor guaranteed > delivery semantics). Why wouldn't you use a jms queue system (e.g. > activemq) if you need transactional messaging, backed by a database, etc.? > > Jason > > > On Fri, Oct 26, 2012 at 11:18 AM, Jay Kreps <[EMAIL PROTECTED]> wrote: > > > There are a few oddities of using the batching feature to get a kind of > > transactionality (that wasn't the original intention): > > 1. It only actually works if you enable compression. Currently I don't > > think we allow uncompressed recursive message batches. Without this the > > batching only protects against producer failure, in the case of broker > > failure their is no guarantee (either with or without replication) that > you > > won't fail in the middle of writing the batch. We could > > consider separating out the batching from the compression support to make > > this work in a more sane way. > > 2. It doesn't work across partitions or topics. > > > > -Jay > > > > On Thu, Oct 25, 2012 at 6:19 PM, Neha Narkhede <[EMAIL PROTECTED] > > >wrote: > > > > > The closest concept of transaction on the publisher side, that I can > > > think of, is using batch of messages in a single call to the > > > synchronous producer. > > > > > > Precisely, you can configure a Kafka producer to use the "sync" mode > > > and batch messages that require transactional guarantees in a > > > single send() call. That will ensure that either all the messages in > > > the batch are sent or none. > > > > > > Thanks, > > > Neha > > > > > > On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <[EMAIL PROTECTED]> > wrote: > > > > Is there an accepted, or recommended way to make writes to a Kafka > > > > queue idempotent, or within a transaction? > > > > > > > > I can configure my system such that each queue has exactly one > > producer. > > > > > > > > (If there are no accepted/recommended ways, I have a few ideas I > would > > > > like to propose. I would also be willing to implement them if needed) > > > > > > > > Thanks in advance! > > > > > > > > --Tom > > > > > >
-
Re: Transactional writingJun Rao 2012-10-29, 03:09
For this particular use case, you can potentially include the message
offset from broker A in the message sent to broker B. If the transaction fails, you read the last message from broker B and use the included offset to resume the consumer from broker A. This assumes that there is only a single client writing to broker B (for a particular partition). Thanks, Jun On Fri, Oct 26, 2012 at 11:31 AM, Guozhang Wang <[EMAIL PROTECTED]> wrote: > I am also quite interested in this thread, and I have another question > here to ask about committing consumed messages. For example, if I need a > program which acts both as a consumer and a producer, and the actions are > wrapped in a "transaction": > > Transaction start: > > Get next message from broker A; > > Do something; > > Send a message to broker B; > > Commit. > > > If the transaction aborts after reading the message from broker A, is it > possible to logically "put the message back" to brokers? I remember that > Amazon Queue Service use some sort of lease mechanism, which might work for > this case. But I am afraid that will affect the throughput a lot.. > > --Guozhang > > > -----Original Message----- > From: Jay Kreps [mailto:[EMAIL PROTECTED]] > Sent: Friday, October 26, 2012 2:08 PM > To: [EMAIL PROTECTED] > Subject: Re: Transactional writing > > This is an important feature and I am interested in helping out in the > design and implementation, though I am working on 0.8 features for the next > month so I may not be of too much use. I have thought a little bit about > this, but I am not yet sure of the best approach. > > Here is a specific use case I think is important to address: consider a > case where you are doing processing of one or more streams and producing an > output stream. This processing may involve some kind of local state (say > counters or other local aggregation intermediate state). This is a common > scenario. The problem is to give reasonable semantics to this computation > in the presence of failures. The processor effectively has a > position/offset in each of its input streams as well as whatever local > state. The problem is that if this process fails it needs to restore to a > state that matches the last produced messages. There are several solutions > to this problem. One is to make the output somehow idempotent, this will > solve some cases but is not a general solution as many things cannot be > made idempotent easily. > > I think the two proposals you give outline a couple of basic approaches: > 1. Store the messages on the server somewhere but don't add them to the log > until the commit call > 2. Store the messages in the log but don't make them available to the > consumer until the commit call > Another option you didn't mention: > > I can give several subtleties to these approaches. > > One advantage of the second approach is that messages are in the log and > can be available for reading or not. This makes it possible to support a > kind of "dirty read" that allows the consumer to specify whether they want > to immediately see all messages with low latency but potentially see > uncommitted messages or only see committed messages. > > The problem with the second approach at least in the way you describe it is > that you have to lock the log until the commit occurs otherwise you can't > roll back (because otherwise someone else may have appended their own > messages and you can't truncate the log). This would have all the problems > of remote locks. I think this might be a deal-breaker. > > Another variation on the second approach would be the following: have each > producer maintain an id and generation number. Keep a schedule of valid > offset/id/generation numbers on the broker and only hand these out. This > solution would support non-blocking multi-writer appends but requires more > participation from the producer (i.e. getting a generation number and id). > > Cheers, > > -Jay > > On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <[EMAIL PROTECTED]> wrote:
-
Re: Transactional writingJun Rao 2012-10-29, 05:31
If you use Kafka just as a redo log, you can't undo anything that's written
to the log. Write-ahead logs in typical database systems are both redo and undo logs. Transaction commits and rollbacks are implemented on top of the logs. However, general-purpose write-ahead logs for transactions are much more complicated. Thanks, Jun On Fri, Oct 26, 2012 at 11:08 AM, Jay Kreps <[EMAIL PROTECTED]> wrote: > This is an important feature and I am interested in helping out in the > design and implementation, though I am working on 0.8 features for the next > month so I may not be of too much use. I have thought a little bit about > this, but I am not yet sure of the best approach. > > Here is a specific use case I think is important to address: consider a > case where you are doing processing of one or more streams and producing an > output stream. This processing may involve some kind of local state (say > counters or other local aggregation intermediate state). This is a common > scenario. The problem is to give reasonable semantics to this computation > in the presence of failures. The processor effectively has a > position/offset in each of its input streams as well as whatever local > state. The problem is that if this process fails it needs to restore to a > state that matches the last produced messages. There are several solutions > to this problem. One is to make the output somehow idempotent, this will > solve some cases but is not a general solution as many things cannot be > made idempotent easily. > > I think the two proposals you give outline a couple of basic approaches: > 1. Store the messages on the server somewhere but don't add them to the log > until the commit call > 2. Store the messages in the log but don't make them available to the > consumer until the commit call > Another option you didn't mention: > > I can give several subtleties to these approaches. > > One advantage of the second approach is that messages are in the log and > can be available for reading or not. This makes it possible to support a > kind of "dirty read" that allows the consumer to specify whether they want > to immediately see all messages with low latency but potentially see > uncommitted messages or only see committed messages. > > The problem with the second approach at least in the way you describe it is > that you have to lock the log until the commit occurs otherwise you can't > roll back (because otherwise someone else may have appended their own > messages and you can't truncate the log). This would have all the problems > of remote locks. I think this might be a deal-breaker. > > Another variation on the second approach would be the following: have each > producer maintain an id and generation number. Keep a schedule of valid > offset/id/generation numbers on the broker and only hand these out. This > solution would support non-blocking multi-writer appends but requires more > participation from the producer (i.e. getting a generation number and id). > > Cheers, > > -Jay > > On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <[EMAIL PROTECTED]> wrote: > > > I have come up with two different possibilities, both with different > > trade-offs. > > > > The first would be to support "true" transactions by writing > > transactional data into a temporary file and then copy it directly to > > the end of the partition when the commit command is created. The > > upside to this approach is that individual transactions can be larger > > than a single batch, and more than one producer could conduct > > transactions at once. The downside is the extra IO involved in writing > > it and reading it from disk an extra time. > > > > The second would be to allow any number of messages to be appended to > > a topic, but not move the "end of topic" offset until the commit was > > received. If a rollback was received, or the producer timed out, the > > partition could be truncated at the most recently recognized "end of > > topic" offset. The upside is that there is very little extra IO (only
-
Re: Transactional writingRohit Prasad 2012-11-02, 22:11
Getting transactional support is quite hard problem. There will always be
corner cases where the solution will not work, unless you want to go down the path of 2PC, paxos, etc which ofcourse will degrade kafka's performance. It is best to reconcile data and deal with duplicate messages in Application layer. Having said that it would be amazing if we can build "at most once" semantics in Kafka!! Regarding above approaches, The producer will always have a doubt if its commit went through. i.e. if the ack for "commit" is not received by the producer. Or If producer dies immediately after calling the commit. When it is restarted how does it know if last operation went through? I suggest the following - 1. Producer should attach a timestamp at the beginning of each message and send it to Server. 2. On restarts/timeouts/re-connections, the producer should first read the last committed message from the leader of the partition. 3. From timestamp, it can know how many messages went through before it died (or connection was broken). And it can infer how many messages to replay. The above approach can be used with existing Kafka libraries since you can have a producer and consumer thread together in an application to implement this logic. Or someone can take the initiative to write a Transactional producer (which internally has both producer and a consumer to read last committed message.) I will be developing one for kafka 0.8 in c++. The above approach will work even if you batch messages for a single partition. The above approach will work only if a single producer is writing to a partition. I want hear opinions about the above approach. I sure there can be corner-cases where it may break. If there are multiple producers to a partition, then some book keeping on server side with regards to last msg committed from a "co-relation id" (to identify unique producer) may be needed. Regards, Rohit On Sun, Oct 28, 2012 at 10:31 PM, Jun Rao <[EMAIL PROTECTED]> wrote: > If you use Kafka just as a redo log, you can't undo anything that's written > to the log. Write-ahead logs in typical database systems are both redo and > undo logs. Transaction commits and rollbacks are implemented on top of the > logs. However, general-purpose write-ahead logs for transactions are much > more complicated. > > Thanks, > > Jun > > On Fri, Oct 26, 2012 at 11:08 AM, Jay Kreps <[EMAIL PROTECTED]> wrote: > > > This is an important feature and I am interested in helping out in the > > design and implementation, though I am working on 0.8 features for the > next > > month so I may not be of too much use. I have thought a little bit about > > this, but I am not yet sure of the best approach. > > > > Here is a specific use case I think is important to address: consider a > > case where you are doing processing of one or more streams and producing > an > > output stream. This processing may involve some kind of local state (say > > counters or other local aggregation intermediate state). This is a common > > scenario. The problem is to give reasonable semantics to this computation > > in the presence of failures. The processor effectively has a > > position/offset in each of its input streams as well as whatever local > > state. The problem is that if this process fails it needs to restore to a > > state that matches the last produced messages. There are several > solutions > > to this problem. One is to make the output somehow idempotent, this will > > solve some cases but is not a general solution as many things cannot be > > made idempotent easily. > > > > I think the two proposals you give outline a couple of basic approaches: > > 1. Store the messages on the server somewhere but don't add them to the > log > > until the commit call > > 2. Store the messages in the log but don't make them available to the > > consumer until the commit call > > Another option you didn't mention: > > > > I can give several subtleties to these approaches. > > > > One advantage of the second approach is that messages are in the log and
-
Re: Transactional writingTom Brown 2012-11-02, 23:12
That approach allows a producer to prevent duplicate messages to the
partition, but what about the consumer? In my case, I don't want the consumer to be able to read any of the messages unless it can read all of the messages from a transaction. I also like the idea of there being multiple types of Kafka transaction, though, just to accommodate different performance, reliability, and consumption patterns. Of course, the added complexity of that might just sink the whole thing. --Tom On Fri, Nov 2, 2012 at 4:11 PM, Rohit Prasad <[EMAIL PROTECTED]> wrote: > Getting transactional support is quite hard problem. There will always be > corner cases where the solution will not work, unless you want to go down > the path of 2PC, paxos, etc which ofcourse will degrade kafka's > performance. It is best to reconcile data and deal with duplicate messages > in Application layer. Having said that it would be amazing if we can build > "at most once" semantics in Kafka!! > > Regarding above approaches, > The producer will always have a doubt if its commit went through. i.e. if > the ack for "commit" is not received by the producer. Or If producer dies > immediately after calling the commit. When it is restarted how does it know > if last operation went through? > > I suggest the following - > 1. Producer should attach a timestamp at the beginning of each message and > send it to Server. > 2. On restarts/timeouts/re-connections, the producer should first read the > last committed message from the leader of the partition. > 3. From timestamp, it can know how many messages went through before it > died (or connection was broken). And it can infer how many messages to > replay. > > The above approach can be used with existing Kafka libraries since you can > have a producer and consumer thread together in an application to implement > this logic. Or someone can take the initiative to write a Transactional > producer (which internally has both producer and a consumer to read last > committed message.) I will be developing one for kafka 0.8 in c++. > > The above approach will work even if you batch messages for a single > partition. > The above approach will work only if a single producer is writing to a > partition. I want hear opinions about the above approach. I sure there can > be corner-cases where it may break. > > If there are multiple producers to a partition, then some book keeping on > server side with regards to last msg committed from a "co-relation id" (to > identify unique producer) may be needed. > > > Regards, > Rohit > > > On Sun, Oct 28, 2012 at 10:31 PM, Jun Rao <[EMAIL PROTECTED]> wrote: > >> If you use Kafka just as a redo log, you can't undo anything that's written >> to the log. Write-ahead logs in typical database systems are both redo and >> undo logs. Transaction commits and rollbacks are implemented on top of the >> logs. However, general-purpose write-ahead logs for transactions are much >> more complicated. >> >> Thanks, >> >> Jun >> >> On Fri, Oct 26, 2012 at 11:08 AM, Jay Kreps <[EMAIL PROTECTED]> wrote: >> >> > This is an important feature and I am interested in helping out in the >> > design and implementation, though I am working on 0.8 features for the >> next >> > month so I may not be of too much use. I have thought a little bit about >> > this, but I am not yet sure of the best approach. >> > >> > Here is a specific use case I think is important to address: consider a >> > case where you are doing processing of one or more streams and producing >> an >> > output stream. This processing may involve some kind of local state (say >> > counters or other local aggregation intermediate state). This is a common >> > scenario. The problem is to give reasonable semantics to this computation >> > in the presence of failures. The processor effectively has a >> > position/offset in each of its input streams as well as whatever local >> > state. The problem is that if this process fails it needs to restore to a >>
-
Re: Transactional writingRohit Prasad 2012-11-03, 18:51
I agree that this approach only prevents duplicate messages to partition
from the Producer side. There needs to be a similar approach on the consumer side too. Using Zk can be one solution, or other non-ZK approaches. Even if Consumer reads none or all messages of a transaction. But that does not solve the transaction problem yet. Because the business/application logic inside the Consumer thread may execute partially and fail. So it becomes tricky to decide the point when you want to say that you have "consumed" the message and increase consumption offset. If your consumer thread is saving some value into DB/HDFS/etc, ideally you want this save operation and consumption offset to be incremented atomically. Thats why it boils down to Application logic implementing transactions and dealing with duplicates. Maybe a journalling or redo log approach on Consumer side can help build such a system. It will be nice if eventually kafka can be a transport which provides "exactly once" semantics for message delivery. Then consumer threads can be sure that they receive messages once, and can build appln logic on top of that. I have a use case similar to what Jay mentioned in a previous mail. I want to do aggregation but want the aggregated data to be correct, possible avoiding duplicates incase of failures/crashes. On Fri, Nov 2, 2012 at 4:12 PM, Tom Brown <[EMAIL PROTECTED]> wrote: > That approach allows a producer to prevent duplicate messages to the > partition, but what about the consumer? In my case, I don't want the > consumer to be able to read any of the messages unless it can read all > of the messages from a transaction. > > I also like the idea of there being multiple types of Kafka > transaction, though, just to accommodate different performance, > reliability, and consumption patterns. Of course, the added complexity > of that might just sink the whole thing. > > --Tom > > On Fri, Nov 2, 2012 at 4:11 PM, Rohit Prasad <[EMAIL PROTECTED]> > wrote: > > Getting transactional support is quite hard problem. There will always be > > corner cases where the solution will not work, unless you want to go down > > the path of 2PC, paxos, etc which ofcourse will degrade kafka's > > performance. It is best to reconcile data and deal with duplicate > messages > > in Application layer. Having said that it would be amazing if we can > build > > "at most once" semantics in Kafka!! > > > > Regarding above approaches, > > The producer will always have a doubt if its commit went through. i.e. if > > the ack for "commit" is not received by the producer. Or If producer dies > > immediately after calling the commit. When it is restarted how does it > know > > if last operation went through? > > > > I suggest the following - > > 1. Producer should attach a timestamp at the beginning of each message > and > > send it to Server. > > 2. On restarts/timeouts/re-connections, the producer should first read > the > > last committed message from the leader of the partition. > > 3. From timestamp, it can know how many messages went through before it > > died (or connection was broken). And it can infer how many messages to > > replay. > > > > The above approach can be used with existing Kafka libraries since you > can > > have a producer and consumer thread together in an application to > implement > > this logic. Or someone can take the initiative to write a Transactional > > producer (which internally has both producer and a consumer to read last > > committed message.) I will be developing one for kafka 0.8 in c++. > > > > The above approach will work even if you batch messages for a single > > partition. > > The above approach will work only if a single producer is writing to a > > partition. I want hear opinions about the above approach. I sure there > can > > be corner-cases where it may break. > > > > If there are multiple producers to a partition, then some book keeping on > > server side with regards to last msg committed from a "co-relation id"
-
Re: Transactional writingMilind Parikh 2012-11-03, 19:53
Why wouldn't the storm approach provide semantics of exactly once
delivery? https://github.com/nathanmarz/storm Nathan actually credits the Kafka_devs for the basic idea of transaction persisting in one of his talks. Regards Milind On Nov 3, 2012 11:51 AM, "Rohit Prasad" <[EMAIL PROTECTED]> wrote: > I agree that this approach only prevents duplicate messages to partition > from the Producer side. There needs to be a similar approach on the > consumer side too. Using Zk can be one solution, or other non-ZK > approaches. > > Even if Consumer reads none or all messages of a transaction. But that does > not solve the transaction problem yet. Because the business/application > logic inside the Consumer thread may execute partially and fail. So it > becomes tricky to decide the point when you want to say that you have > "consumed" the message and increase consumption offset. If your consumer > thread is saving some value into DB/HDFS/etc, ideally you want this save > operation and consumption offset to be incremented atomically. Thats why it > boils down to Application logic implementing transactions and dealing with > duplicates. > Maybe a journalling or redo log approach on Consumer side can help build > such a system. > > It will be nice if eventually kafka can be a transport which provides > "exactly once" semantics for message delivery. Then consumer threads can be > sure that they receive messages once, and can build appln logic on top of > that. > > I have a use case similar to what Jay mentioned in a previous mail. I want > to do aggregation but want the aggregated data to be correct, possible > avoiding duplicates incase of failures/crashes. > > > > On Fri, Nov 2, 2012 at 4:12 PM, Tom Brown <[EMAIL PROTECTED]> wrote: > > > That approach allows a producer to prevent duplicate messages to the > > partition, but what about the consumer? In my case, I don't want the > > consumer to be able to read any of the messages unless it can read all > > of the messages from a transaction. > > > > I also like the idea of there being multiple types of Kafka > > transaction, though, just to accommodate different performance, > > reliability, and consumption patterns. Of course, the added complexity > > of that might just sink the whole thing. > > > > --Tom > > > > On Fri, Nov 2, 2012 at 4:11 PM, Rohit Prasad <[EMAIL PROTECTED]> > > wrote: > > > Getting transactional support is quite hard problem. There will always > be > > > corner cases where the solution will not work, unless you want to go > down > > > the path of 2PC, paxos, etc which ofcourse will degrade kafka's > > > performance. It is best to reconcile data and deal with duplicate > > messages > > > in Application layer. Having said that it would be amazing if we can > > build > > > "at most once" semantics in Kafka!! > > > > > > Regarding above approaches, > > > The producer will always have a doubt if its commit went through. i.e. > if > > > the ack for "commit" is not received by the producer. Or If producer > dies > > > immediately after calling the commit. When it is restarted how does it > > know > > > if last operation went through? > > > > > > I suggest the following - > > > 1. Producer should attach a timestamp at the beginning of each message > > and > > > send it to Server. > > > 2. On restarts/timeouts/re-connections, the producer should first read > > the > > > last committed message from the leader of the partition. > > > 3. From timestamp, it can know how many messages went through before it > > > died (or connection was broken). And it can infer how many messages to > > > replay. > > > > > > The above approach can be used with existing Kafka libraries since you > > can > > > have a producer and consumer thread together in an application to > > implement > > > this logic. Or someone can take the initiative to write a Transactional > > > producer (which internally has both producer and a consumer to read > last > > > committed message.) I will be developing one for kafka 0.8 in c++.
-
Re: Transactional writingJun Rao 2013-03-28, 15:23
Jonathan,
With a single writer, the producer can achieve exact once write. If a send request fails, the producer first checks the end of the log to see if the previous write succeeded or not. The producer will only resend if the previous write fails. To do this, the producer needs the offset of appended messages. In 0.8, such offsets are not returned in our high level producer API yet. We plan to extend our producer API post 0.8 to expose this information. Thanks, Jun On Wed, Mar 27, 2013 at 2:41 PM, Jonathan Hodges <[EMAIL PROTECTED]> wrote: > I know this is a really old thread, but it looked like the only pertinent > one that came up when searching for ‘exactly once’ in the archives. I just > want to confirm my understanding of the 0.8 version in that it still > doesn’t completely support exactly once semantics. With the producer > configured in sync mode and quorum commits there are still some edge case > failure modes where the producer won’t receive the ack and resends the > message(s). I think I read that the consumers don’t see uncommitted > messages in the log, but I don’t think that addresses this producer case. > Please correct me if I am missing something here. > > > Don’t get me wrong we are very thankful for the 0.8 features. It offers by > far the best message delivery guarantees out of the products we evaluated > like Rabbit and ActiveMQ. We attempt to make are downstream consumer > processes idempotent to mitigate this edge case, but it isn’t always > feasible. Also the suggestion by Milind in this thread of using Storm for > exactly once guarantees makes a lot of sense. Trident State seems to > address this very issue ( > https://github.com/nathanmarz/storm/wiki/Trident-state) so we could just > have it mediate our topics that required exactly once. > > > -Jonathan > > > > On Sat, Nov 3, 2012 at 1:53 PM, Milind Parikh <[EMAIL PROTECTED] > >wrote: > > > Why wouldn't the storm approach provide semantics of exactly once > > delivery? https://github.com/nathanmarz/storm > > > > Nathan actually credits the Kafka_devs for the basic idea of transaction > > persisting in one of his talks. > > > > Regards > > Milind > > > > On Nov 3, 2012 11:51 AM, "Rohit Prasad" <[EMAIL PROTECTED]> > wrote: > > > > > I agree that this approach only prevents duplicate messages to > partition > > > from the Producer side. There needs to be a similar approach on the > > > consumer side too. Using Zk can be one solution, or other non-ZK > > > approaches. > > > > > > Even if Consumer reads none or all messages of a transaction. But that > > does > > > not solve the transaction problem yet. Because the business/application > > > logic inside the Consumer thread may execute partially and fail. So it > > > becomes tricky to decide the point when you want to say that you have > > > "consumed" the message and increase consumption offset. If your > consumer > > > thread is saving some value into DB/HDFS/etc, ideally you want this > save > > > operation and consumption offset to be incremented atomically. Thats > why > > it > > > boils down to Application logic implementing transactions and dealing > > with > > > duplicates. > > > Maybe a journalling or redo log approach on Consumer side can help > build > > > such a system. > > > > > > It will be nice if eventually kafka can be a transport which provides > > > "exactly once" semantics for message delivery. Then consumer threads > can > > be > > > sure that they receive messages once, and can build appln logic on top > of > > > that. > > > > > > I have a use case similar to what Jay mentioned in a previous mail. I > > want > > > to do aggregation but want the aggregated data to be correct, possible > > > avoiding duplicates incase of failures/crashes. > > > > > > > > > > > > On Fri, Nov 2, 2012 at 4:12 PM, Tom Brown <[EMAIL PROTECTED]> > wrote: > > > > > > > That approach allows a producer to prevent duplicate messages to the > > > > partition, but what about the consumer? In my case, I don't want the |