Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Mutli Consumers


Copy link to this message
-
Re: Mutli Consumers
Hi,

I have just one consumer per topic. But I have quite a lot of topics.

>Also, to which servers are you seeing the timeouts - Zookeeper or kafka?
Zookeeper.
>Also, any reason why you are lowering the zk session timeout in
>your code?
It was copy pasted from some tutorial.
Cheers,

Klaus

On Thu, Jan 9, 2014 at 4:04 PM, Joel Koshy <[EMAIL PROTECTED]> wrote:

> Are all the consumers in the same group? If so there could be longer
> rebalance latencies; but in general you should be able to have that
> many consumers.
>
> Also, to which servers are you seeing the timeouts - Zookeeper or
> kafka? Also, any reason why you are lowering the zk session timeout in
> your code? Can you share some of your logs?
>
> Joel
>
> On Thu, Jan 9, 2014 at 5:07 AM, Klaus Schaefers
> <[EMAIL PROTECTED]> wrote:
> > Hi,
> >
> > how many consumers can I connect to a kafka cluster? I am running a kafka
> > server with a separate zookeeper server in my development box. For
> testing
> > purpose I am now trying to create a a< large number of consumers, e.g.
> 100
> > or so. But once I created the 40th consumer I start to get timeouts.
> Also I
> > have noticed that the conencton time get longer an longer.
> >
> >
> > My consumer code looks like this:
> >
> >
> >
> > public class KafkaConsumer implements Runnable{
> >
> >     public long count = 0;
> >
> >     public long errors = 0;
> >
> >     private IMessageQueue context;
> >
> >     private ConsumerConnector consumer;
> >
> >     private int a_numThreads = 1;
> >
> >     private KafkaStream<byte[], byte[]> stream;
> >
> >     private volatile boolean running = true;
> >
> >     private String topic;
> >
> >
> >     @Override
> >     public void run() {
> >         Logger.log(KafkaConsumer.class, "run()", "enter >  " +
> this.topic);
> >         ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >         while (running && it.hasNext()){
> >              try {
> >
> >                  // do something
> >                  count++;
> >
> >              } catch (Exception e) {
> >                     errors++;
> >              }
> >          }
> >
> >     }
> >
> >
> >
> >
> >
> >     public KafkaConsumer(IMQListener<ILogEvent> fct, IMessageQueue
> context,
> > String zookeper, String topic) {
> >         this.fct = fct;
> >         this.context = context;
> >         this.topic = topic;
> >
> >
> >         /**
> >          * get the kafka streams
> >          */
> >         ConsumerConfig consumerConfig = createConsumerConfig(zookeper,
> > "group1");
> >         consumer =
> > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> >         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
> >         topicCountMap.put(topic, new Integer(a_numThreads));
> >         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumer.createMessageStreams(topicCountMap);
> >         List<KafkaStream<byte[], byte[]>> streams =
> consumerMap.get(topic);
> >         this.stream = streams.get(0);
> >
> >
> >         /**
> >          * start one thread reading from the queue
> >          */
> >         Thread t = new Thread(this);
> >         t.start();
> >
> >
> >     }
> >
> >
> >
> >
> >      public void shutdown() {
> >          Logger.log(KafkaConsumer.class, "shutdown()", "enter >  " +
> > this.topic);
> >
> >          if (consumer != null)
> >                 consumer.shutdown();
> >
> >
> >          running = false;
> >      }
> >
> >
> >      private static ConsumerConfig createConsumerConfig(String
> a_zookeeper,
> > String a_groupId) {
> >             Properties props = new Properties();
> >             props.put("zookeeper.connect", a_zookeeper);
> >             props.put("group.id", a_groupId);
> >             props.put("zookeeper.session.timeout.ms", "400");
> >             props.put("zookeeper.sync.time.ms", "200");
> >             props.put("auto.commit.interval.ms", "1000");
> >
> >             return new ConsumerConfig(props);

Klaus Schaefers
Senior Optimization Manager

Ligatus GmbH
Hohenstaufenring 30-32
D-50674 Köln

Tel.:  +49 (0) 221 / 56939 -784
Fax:  +49 (0) 221 / 56 939 - 599
E-Mail: [EMAIL PROTECTED]
Web: www.ligatus.de

HRB Köln 56003
Geschäftsführung:
Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
Dipl.-Wirtschaftsingenieur Arne Wolter

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB