Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> producer exceptions when broker dies


Copy link to this message
-
producer exceptions when broker dies
I have cluster of 3 kafka brokers. With the following script I send some
data to kafka and in the middle do the controlled shutdown of 1 broker. All
3 brokers are ISR before I start sending. When i shutdown the broker i get
a couple of exceptions and I expect data shouldn't be written. Say, I send
1500 lines and get 50 exceptions. I expect to consume 1450 lines, but
instead i always consume more, i.e. 1480 or 1490. I want to decide if I
want to retry sending myself, not using message.send.max.retries. But looks
like if I retry sending if there is an exception - I will end up with
duplicates. Is there anything I'm doing wrong or having wrong assumptions
about kafka?

Thanks.

val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
10.80.42.156:9092")
var count = 0
for(line <- Source.fromFile(file).getLines()){
    try {
      prod.send("benchmark", buffer.toList)
      count += 1
      println("sent %s", count)
    } catch {
      case _ => println("Exception!")
    }
}

class MyProducer(brokerList: String) {
  val sync = true
  val requestRequiredAcks = "-1"

  val props = new Properties()
  props.put("metadata.broker.list", brokerList)
  props.put("producer.type", if(sync) "sync" else "async")
  props.put("request.required.acks", requestRequiredAcks)
  props.put("key.serializer.class", classOf[StringEncoder].getName)
  props.put("serializer.class", classOf[StringEncoder].getName)
  props.put("message.send.max.retries", "0")
  props.put("request.timeout.ms", "2000")

  val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))

  def send(topic: String, messages: List[String]) = {
    val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
    for (message <- messages) {
      requests += new KeyedMessage(topic, null, message, message)
    }
    producer.send(requests)
  }
}

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB