Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> producer exceptions when broker dies


Copy link to this message
-
Re: producer exceptions when broker dies
Hello Guozhang,

My partitions are split almost evenly between broker, so, yes - broker that
I shutdown is the leader for some of them. Does it mean i can get an
exception and data is still being written? Is there any setting on the
broker where i can control this? I.e. can i make broker replication timeout
shorter than producer timeout, so i can ensure if i get an exception data
is not being committed?

Thanks.
On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <[EMAIL PROTECTED]> wrote:

> Hello Kane,
>
> As discussed in the other thread, even if a timeout response is sent back
> to the producer, the message may still be committed.
>
> Did you shut down the leader broker of the partition or a follower broker?
>
> Guozhang
>
> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <[EMAIL PROTECTED]> wrote:
>
> > I have cluster of 3 kafka brokers. With the following script I send some
> > data to kafka and in the middle do the controlled shutdown of 1 broker.
> All
> > 3 brokers are ISR before I start sending. When i shutdown the broker i
> get
> > a couple of exceptions and I expect data shouldn't be written. Say, I
> send
> > 1500 lines and get 50 exceptions. I expect to consume 1450 lines, but
> > instead i always consume more, i.e. 1480 or 1490. I want to decide if I
> > want to retry sending myself, not using message.send.max.retries. But
> looks
> > like if I retry sending if there is an exception - I will end up with
> > duplicates. Is there anything I'm doing wrong or having wrong assumptions
> > about kafka?
> >
> > Thanks.
> >
> > val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
> > 10.80.42.156:9092")
> > var count = 0
> > for(line <- Source.fromFile(file).getLines()){
> >     try {
> >       prod.send("benchmark", buffer.toList)
> >       count += 1
> >       println("sent %s", count)
> >     } catch {
> >       case _ => println("Exception!")
> >     }
> > }
> >
> > class MyProducer(brokerList: String) {
> >   val sync = true
> >   val requestRequiredAcks = "-1"
> >
> >   val props = new Properties()
> >   props.put("metadata.broker.list", brokerList)
> >   props.put("producer.type", if(sync) "sync" else "async")
> >   props.put("request.required.acks", requestRequiredAcks)
> >   props.put("key.serializer.class", classOf[StringEncoder].getName)
> >   props.put("serializer.class", classOf[StringEncoder].getName)
> >   props.put("message.send.max.retries", "0")
> >   props.put("request.timeout.ms", "2000")
> >
> >   val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
> >
> >   def send(topic: String, messages: List[String]) = {
> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
> >     for (message <- messages) {
> >       requests += new KeyedMessage(topic, null, message, message)
> >     }
> >     producer.send(requests)
> >   }
> > }
> >
>
>
>
> --
> -- Guozhang
>