Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - Buffer sizing for consumers


Copy link to this message
-
Re: Buffer sizing for consumers
Jun Rao 2013-08-15, 14:58
Just be aware that today, a full message needs to be buffered in
producer/broker/consumer. So the larger the message, the larger the memory
requirement. This can also lead to more GC activities due to fragmentation.

Thanks,

Jun
On Wed, Aug 14, 2013 at 9:48 PM, Eric Sites <[EMAIL PROTECTED]>wrote:

> The average malware file size is about 1.5 MB and the whitelist file size
> is about 2.5 MB.
>
>
> Kafka 0.8 is a very nice system for us, it allows us to have a very
> resilient scalable information processing system.
>
> We also move all the meta data around from the workers through Kafka to
> multiple Hadoop clusters running Hbase.
>
> We also push all of our metrics from TCollector through Kafka and use
> multiple consumers to push the metrics into a cluster of OpenTSDB server
> sitting in multiple Hadoop clusters, using consumers in different consumer
> groups.
>
> - Eric Sites
>
> On 8/15/13 12:41 AM, "Eric Sites" <[EMAIL PROTECTED]> wrote:
>
> >Lots of malware. 210 MB is our upper limit that I want to support in the
> >system we are building.
> >
> >
> >Most of the messages are metrics, command messages, meta data about the
> >malware, etc...
> >
> >But so far Kafka 0.8 has worked out really well for moving the malware
> >files, whitelist files, and the unknown stuff around to place them in
> >storage, and run them through our meta data collection workers. 100s of
> >servers.
> >
> >I am just trying to track down some memory issues on one class of workers
> >that store the whitelist files for testing our av engine definitions
> >before release.
> >
> >Cheers,
> >Eric Sites
> >
> >On 8/15/13 12:27 AM, "Jun Rao" <[EMAIL PROTECTED]> wrote:
> >
> >>You need to set fetch.size in consumer config to be at least 210MB plus
> >>message overhead (about 10 bytes) in Kafka. What data are you sending?
> >>210MB for a single message is bit unusual for Kafka.
> >>
> >>Thanks,
> >>
> >>Jun
> >>
> >>
> >>On Wed, Aug 14, 2013 at 9:11 PM, Eric Sites
> >><[EMAIL PROTECTED]>wrote:
> >>
> >>> Everyone,
> >>>
> >>> I need a little help figuring out how buffers are allocated in Kafka
> >>> consumers ver 0.8.
> >>>
> >>> What are the proper settings for a consumer that needs to receive a
> >>>single
> >>> message that is 210 MB in size.
> >>>
> >>> The consumer listens to multiple topics all with a single partition.
> >>>One
> >>> of the topics is where the 210 MB message will come from
> >>> And the other topics will be very small messages.
> >>>
> >>>     consumer =
> >>> Consumer.createJavaConsumerConnector(createConsumerConfig());
> >>>             Map<String, Integer> topicCountMap = new HashMap<String,
> >>> Integer>();
> >>>             topicCountMap.put(paradiso_scan_job_topic, 1);
> >>>             topicCountMap.put(paradiso_scan_cancel_topic, 1);
> >>>             topicCountMap.put(paradiso_add_worker_name_topic, 1);
> >>>             topicCountMap.put(paradiso_file_delete_topic, 1);
> >>>
> >>>             Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap
> >>>=
> >>> consumer.createMessageStreams(topicCountMap);
> >>>
> >>>             // Start worker status threads to watch other topics
> >>>             ParadisoWorkerCancelConsumer cancel_consumer = new
> >>>
> >>>ParadisoWorkerCancelConsumer(consumerMap.get(paradiso_scan_cancel_topic)
> >>>.
> >>>get(0));
> >>>             cancel_consumer.start();
> >>>             ParadisoWorkerFileAdd file_add = new
> >>>
> >>>ParadisoWorkerFileAdd(consumerMap.get(paradiso_add_worker_name_topic).ge
> >>>t
> >>>(0));
> >>>             file_add.start();
> >>>             ParadisoWorkerFileDelete file_delete = new
> >>>
> >>>ParadisoWorkerFileDelete(consumerMap.get(paradiso_file_delete_topic).get
> >>>(
> >>>0));
> >>>             file_delete.start();
> >>>
> >>>     KafkaStream<byte[], byte[]> stream =
> >>>  consumerMap.get(paradiso_scan_job_topic).get(0);
> >>>             ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >>>
> >>>             while (it.hasNext() && !time_to_shutdown) {