Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - Mutli Consumers


Copy link to this message
-
Re: Mutli Consumers
Joel Koshy 2014-01-09, 15:05
Are all the consumers in the same group? If so there could be longer
rebalance latencies; but in general you should be able to have that
many consumers.

Also, to which servers are you seeing the timeouts - Zookeeper or
kafka? Also, any reason why you are lowering the zk session timeout in
your code? Can you share some of your logs?

Joel

On Thu, Jan 9, 2014 at 5:07 AM, Klaus Schaefers
<[EMAIL PROTECTED]> wrote:
> Hi,
>
> how many consumers can I connect to a kafka cluster? I am running a kafka
> server with a separate zookeeper server in my development box. For testing
> purpose I am now trying to create a a< large number of consumers, e.g. 100
> or so. But once I created the 40th consumer I start to get timeouts. Also I
> have noticed that the conencton time get longer an longer.
>
>
> My consumer code looks like this:
>
>
>
> public class KafkaConsumer implements Runnable{
>
>     public long count = 0;
>
>     public long errors = 0;
>
>     private IMessageQueue context;
>
>     private ConsumerConnector consumer;
>
>     private int a_numThreads = 1;
>
>     private KafkaStream<byte[], byte[]> stream;
>
>     private volatile boolean running = true;
>
>     private String topic;
>
>
>     @Override
>     public void run() {
>         Logger.log(KafkaConsumer.class, "run()", "enter >  " + this.topic);
>         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>         while (running && it.hasNext()){
>              try {
>
>                  // do something
>                  count++;
>
>              } catch (Exception e) {
>                     errors++;
>              }
>          }
>
>     }
>
>
>
>
>
>     public KafkaConsumer(IMQListener<ILogEvent> fct, IMessageQueue context,
> String zookeper, String topic) {
>         this.fct = fct;
>         this.context = context;
>         this.topic = topic;
>
>
>         /**
>          * get the kafka streams
>          */
>         ConsumerConfig consumerConfig = createConsumerConfig(zookeper,
> "group1");
>         consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
>         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>         topicCountMap.put(topic, new Integer(a_numThreads));
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
>         this.stream = streams.get(0);
>
>
>         /**
>          * start one thread reading from the queue
>          */
>         Thread t = new Thread(this);
>         t.start();
>
>
>     }
>
>
>
>
>      public void shutdown() {
>          Logger.log(KafkaConsumer.class, "shutdown()", "enter >  " +
> this.topic);
>
>          if (consumer != null)
>                 consumer.shutdown();
>
>
>          running = false;
>      }
>
>
>      private static ConsumerConfig createConsumerConfig(String a_zookeeper,
> String a_groupId) {
>             Properties props = new Properties();
>             props.put("zookeeper.connect", a_zookeeper);
>             props.put("group.id", a_groupId);
>             props.put("zookeeper.session.timeout.ms", "400");
>             props.put("zookeeper.sync.time.ms", "200");
>             props.put("auto.commit.interval.ms", "1000");
>
>             return new ConsumerConfig(props);
>         }
>
>
>
> }
>
>
>
> Cheers,
>
> KLaus
>
> --
>
> --
>
> Klaus Schaefers
> Senior Optimization Manager
>
> Ligatus GmbH
> Hohenstaufenring 30-32
> D-50674 Köln
>
> Tel.:  +49 (0) 221 / 56939 -784
> Fax:  +49 (0) 221 / 56 939 - 599
> E-Mail: [EMAIL PROTECTED]
> Web: www.ligatus.de
>
> HRB Köln 56003
> Geschäftsführung:
> Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
> Dipl.-Wirtschaftsingenieur Arne Wolter