Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Mutli Consumers


Copy link to this message
-
Re: Mutli Consumers
Hi,

I have just one consumer per topic. But I have quite a lot of topics.

>Also, to which servers are you seeing the timeouts - Zookeeper or kafka?
Zookeeper.
>Also, any reason why you are lowering the zk session timeout in
>your code?
It was copy pasted from some tutorial.
Cheers,

Klaus

On Thu, Jan 9, 2014 at 4:04 PM, Joel Koshy <[EMAIL PROTECTED]> wrote:

> Are all the consumers in the same group? If so there could be longer
> rebalance latencies; but in general you should be able to have that
> many consumers.
>
> Also, to which servers are you seeing the timeouts - Zookeeper or
> kafka? Also, any reason why you are lowering the zk session timeout in
> your code? Can you share some of your logs?
>
> Joel
>
> On Thu, Jan 9, 2014 at 5:07 AM, Klaus Schaefers
> <[EMAIL PROTECTED]> wrote:
> > Hi,
> >
> > how many consumers can I connect to a kafka cluster? I am running a kafka
> > server with a separate zookeeper server in my development box. For
> testing
> > purpose I am now trying to create a a< large number of consumers, e.g.
> 100
> > or so. But once I created the 40th consumer I start to get timeouts.
> Also I
> > have noticed that the conencton time get longer an longer.
> >
> >
> > My consumer code looks like this:
> >
> >
> >
> > public class KafkaConsumer implements Runnable{
> >
> >     public long count = 0;
> >
> >     public long errors = 0;
> >
> >     private IMessageQueue context;
> >
> >     private ConsumerConnector consumer;
> >
> >     private int a_numThreads = 1;
> >
> >     private KafkaStream<byte[], byte[]> stream;
> >
> >     private volatile boolean running = true;
> >
> >     private String topic;
> >
> >
> >     @Override
> >     public void run() {
> >         Logger.log(KafkaConsumer.class, "run()", "enter >  " +
> this.topic);
> >         ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >         while (running && it.hasNext()){
> >              try {
> >
> >                  // do something
> >                  count++;
> >
> >              } catch (Exception e) {
> >                     errors++;
> >              }
> >          }
> >
> >     }
> >
> >
> >
> >
> >
> >     public KafkaConsumer(IMQListener<ILogEvent> fct, IMessageQueue
> context,
> > String zookeper, String topic) {
> >         this.fct = fct;
> >         this.context = context;
> >         this.topic = topic;
> >
> >
> >         /**
> >          * get the kafka streams
> >          */
> >         ConsumerConfig consumerConfig = createConsumerConfig(zookeper,
> > "group1");
> >         consumer =
> > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> >         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
> >         topicCountMap.put(topic, new Integer(a_numThreads));
> >         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumer.createMessageStreams(topicCountMap);
> >         List<KafkaStream<byte[], byte[]>> streams =
> consumerMap.get(topic);
> >         this.stream = streams.get(0);
> >
> >
> >         /**
> >          * start one thread reading from the queue
> >          */
> >         Thread t = new Thread(this);
> >         t.start();
> >
> >
> >     }
> >
> >
> >
> >
> >      public void shutdown() {
> >          Logger.log(KafkaConsumer.class, "shutdown()", "enter >  " +
> > this.topic);
> >
> >          if (consumer != null)
> >                 consumer.shutdown();
> >
> >
> >          running = false;
> >      }
> >
> >
> >      private static ConsumerConfig createConsumerConfig(String
> a_zookeeper,
> > String a_groupId) {
> >             Properties props = new Properties();
> >             props.put("zookeeper.connect", a_zookeeper);
> >             props.put("group.id", a_groupId);
> >             props.put("zookeeper.session.timeout.ms", "400");
> >             props.put("zookeeper.sync.time.ms", "200");
> >             props.put("auto.commit.interval.ms", "1000");
> >
> >             return new ConsumerConfig(props);

Klaus Schaefers
Senior Optimization Manager

Ligatus GmbH
Hohenstaufenring 30-32
D-50674 Köln

Tel.:  +49 (0) 221 / 56939 -784
Fax:  +49 (0) 221 / 56 939 - 599
E-Mail: [EMAIL PROTECTED]
Web: www.ligatus.de

HRB Köln 56003
Geschäftsführung:
Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
Dipl.-Wirtschaftsingenieur Arne Wolter