Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Buffer sizing for consumers


Copy link to this message
-
Re: Buffer sizing for consumers
Just be aware that today, a full message needs to be buffered in
producer/broker/consumer. So the larger the message, the larger the memory
requirement. This can also lead to more GC activities due to fragmentation.

Thanks,

Jun
On Wed, Aug 14, 2013 at 9:48 PM, Eric Sites <[EMAIL PROTECTED]>wrote:

> The average malware file size is about 1.5 MB and the whitelist file size
> is about 2.5 MB.
>
>
> Kafka 0.8 is a very nice system for us, it allows us to have a very
> resilient scalable information processing system.
>
> We also move all the meta data around from the workers through Kafka to
> multiple Hadoop clusters running Hbase.
>
> We also push all of our metrics from TCollector through Kafka and use
> multiple consumers to push the metrics into a cluster of OpenTSDB server
> sitting in multiple Hadoop clusters, using consumers in different consumer
> groups.
>
> - Eric Sites
>
> On 8/15/13 12:41 AM, "Eric Sites" <[EMAIL PROTECTED]> wrote:
>
> >Lots of malware. 210 MB is our upper limit that I want to support in the
> >system we are building.
> >
> >
> >Most of the messages are metrics, command messages, meta data about the
> >malware, etc...
> >
> >But so far Kafka 0.8 has worked out really well for moving the malware
> >files, whitelist files, and the unknown stuff around to place them in
> >storage, and run them through our meta data collection workers. 100s of
> >servers.
> >
> >I am just trying to track down some memory issues on one class of workers
> >that store the whitelist files for testing our av engine definitions
> >before release.
> >
> >Cheers,
> >Eric Sites
> >
> >On 8/15/13 12:27 AM, "Jun Rao" <[EMAIL PROTECTED]> wrote:
> >
> >>You need to set fetch.size in consumer config to be at least 210MB plus
> >>message overhead (about 10 bytes) in Kafka. What data are you sending?
> >>210MB for a single message is bit unusual for Kafka.
> >>
> >>Thanks,
> >>
> >>Jun
> >>
> >>
> >>On Wed, Aug 14, 2013 at 9:11 PM, Eric Sites
> >><[EMAIL PROTECTED]>wrote:
> >>
> >>> Everyone,
> >>>
> >>> I need a little help figuring out how buffers are allocated in Kafka
> >>> consumers ver 0.8.
> >>>
> >>> What are the proper settings for a consumer that needs to receive a
> >>>single
> >>> message that is 210 MB in size.
> >>>
> >>> The consumer listens to multiple topics all with a single partition.
> >>>One
> >>> of the topics is where the 210 MB message will come from
> >>> And the other topics will be very small messages.
> >>>
> >>>     consumer =
> >>> Consumer.createJavaConsumerConnector(createConsumerConfig());
> >>>             Map<String, Integer> topicCountMap = new HashMap<String,
> >>> Integer>();
> >>>             topicCountMap.put(paradiso_scan_job_topic, 1);
> >>>             topicCountMap.put(paradiso_scan_cancel_topic, 1);
> >>>             topicCountMap.put(paradiso_add_worker_name_topic, 1);
> >>>             topicCountMap.put(paradiso_file_delete_topic, 1);
> >>>
> >>>             Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap
> >>>=
> >>> consumer.createMessageStreams(topicCountMap);
> >>>
> >>>             // Start worker status threads to watch other topics
> >>>             ParadisoWorkerCancelConsumer cancel_consumer = new
> >>>
> >>>ParadisoWorkerCancelConsumer(consumerMap.get(paradiso_scan_cancel_topic)
> >>>.
> >>>get(0));
> >>>             cancel_consumer.start();
> >>>             ParadisoWorkerFileAdd file_add = new
> >>>
> >>>ParadisoWorkerFileAdd(consumerMap.get(paradiso_add_worker_name_topic).ge
> >>>t
> >>>(0));
> >>>             file_add.start();
> >>>             ParadisoWorkerFileDelete file_delete = new
> >>>
> >>>ParadisoWorkerFileDelete(consumerMap.get(paradiso_file_delete_topic).get
> >>>(
> >>>0));
> >>>             file_delete.start();
> >>>
> >>>     KafkaStream<byte[], byte[]> stream =
> >>>  consumerMap.get(paradiso_scan_job_topic).get(0);
> >>>             ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >>>
> >>>             while (it.hasNext() && !time_to_shutdown) {

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB