Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume >> mail # user >> Avro sink to source is too slow


Copy link to this message
-
Re: Avro sink to source is too slow
Thank you Mike & Roshan.

I've changed both flumes(agent & collector) to run with memory channels, I
also removed the compression for now.
I sample the EventAcceptedCount metric on collector's avro source every 2
minutes and it seem that during two minutes it received 1,027,000 records
which is ~ 77MB.

It is a better throughput but still not what I expect.

This is my current agent config:

agent.sources = logsdir
agent.sources.logsdir.type = spooldir
agent.sources.logsdir.channels = fileChannel
agent.sources.logsdir.spoolDir = /disk/old_logs
agent.sources.logsdir.fileHeader = true
agent.sources.logsdir.batchSize=1000
agent.sources.logsdir.deletePolicy=immediate
agent.sources.logsdir.interceptors =  ihost iserver_type iserver_id
#agent.sources.logsdir.interceptors.ihost.type = host
#agent.sources.logsdir.interceptors.ihost.useIP = false
#agent.sources.logsdir.interceptors.ihost.hostHeader = server_hostname
agent.sources.logsdir.interceptors.ihost.type = static
agent.sources.logsdir.interceptors.ihost.key = server_hostname
agent.sources.logsdir.interceptors.ihost.value = hs666
agent.sources.logsdir.interceptors.iserver_type.type = static
agent.sources.logsdir.interceptors.iserver_type.key = server_type
agent.sources.logsdir.interceptors.iserver_type.value = Push
agent.sources.logsdir.interceptors.iserver_id.type = static
agent.sources.logsdir.interceptors.iserver_id.key = server_id
agent.sources.logsdir.interceptors.iserver_id.value = 1

agent.sources.logsdir.deserializer.maxLineLength = 10240
agent.channels = fileChannel
agent.channels.fileChannel.type = memory
agent.channels.fileChannel.capacity = 100000
agent.channels.fileChannel.transactionCapacity = 1000

#agent.channels.fileChannel.type = file
#agent.channels.fileChannel.checkpointDir=/mnt/flume/filechannel/checkpoint
#agent.channels.fileChannel.dataDirs=/mnt/flume/filechannel/data
#agent.channels.fileChannel.capacity=2000000
#agent.channels.fileChannel.transactionCapacity=1000
#agent.channels.fileChannel.use-fast-replay=true
#agent.channels.fileChannel.useDualCheckpoints=true
#agent.channels.fileChannel.backupCheckpointDir=/mnt/flume/filechannel/backupCheckpointDir
#agent.channels.fileChannel.minimumRequiredSpace=1073741824
#agent.channels.fileChannel.maxFileSize=524288000

agent.sinks = AvroSink1-1 AvroSink1-2 AvroSink1-3 AvroSink1-4

agent.sinks.AvroSink1-1.type = avro
agent.sinks.AvroSink1-1.channel = fileChannel
agent.sinks.AvroSink1-1.hostname = X.X.X.X
agent.sinks.AvroSink1-1.port = 45451
agent.sinks.AvroSink1-1.connect-timeout = 60000
agent.sinks.AvroSink1-1.request-timeout = 60000
agent.sinks.AvroSink1-1.batch-size = 1000
#agent.sinks.AvroSink1-1.compression-type=deflate
#agent.sinks.AvroSink1-1.compression-level=9

agent.sinks.AvroSink1-2.type = avro
agent.sinks.AvroSink1-2.channel = fileChannel
agent.sinks.AvroSink1-2.hostname = X.X.X.X
agent.sinks.AvroSink1-2.port = 45451
agent.sinks.AvroSink1-2.connect-timeout = 60000
agent.sinks.AvroSink1-2.request-timeout = 60000
agent.sinks.AvroSink1-2.batch-size = 1000
#agent.sinks.AvroSink1-2.compression-type=deflate
#agent.sinks.AvroSink1-2.compression-level=9

agent.sinks.AvroSink1-3.type = avro
agent.sinks.AvroSink1-3.channel = fileChannel
agent.sinks.AvroSink1-3.hostname = X.X.X.X
agent.sinks.AvroSink1-3.port = 45451
agent.sinks.AvroSink1-3.connect-timeout = 60000
agent.sinks.AvroSink1-3.request-timeout = 60000
agent.sinks.AvroSink1-3.batch-size = 1000
#agent.sinks.AvroSink1-3.compression-type=deflate
#agent.sinks.AvroSink1-3.compression-level=9

agent.sinks.AvroSink1-4.type = avro
agent.sinks.AvroSink1-4.channel = fileChannel
agent.sinks.AvroSink1-4.hostname = X.X.X.X
agent.sinks.AvroSink1-4.port = 45451
agent.sinks.AvroSink1-4.connect-timeout = 60000
agent.sinks.AvroSink1-4.request-timeout = 60000
agent.sinks.AvroSink1-4.batch-size = 1000
#agent.sinks.AvroSink1-4.compression-type=deflate
#agent.sinks.AvroSink1-4.compression-level=9

This is the relevant part of the collector config (in general 1 avro source
writes to 3 channels):

collector.sources = ExternalAvroSource

collector.sources.ExternalAvroSource.type = avro
collector.sources.ExternalAvroSource.bind = 0.0.0.0
collector.sources.ExternalAvroSource.port = 45451
#collector.sources.ExternalAvroSource.compression-type=deflate
#collector.sources.ExternalAvroSource.threads = 64
## Source writes to 3 channels, one for each sink (Fan Out)
collector.sources.ExternalAvroSource.channels = filechannel-backup
filechannel-s3raw filechannel-s3prep-internal
collector.sources.ExternalAvroSource.selector.type = replicating
collector.sources.ExternalAvroSource.interceptors = iviber itime_default
collector.sources.ExternalAvroSource.interceptors.itime_default.type static
collector.sources.ExternalAvroSource.interceptors.itime_default.preserveExisting
= true
collector.sources.ExternalAvroSource.interceptors.itime_default.key timestamp
collector.sources.ExternalAvroSource.interceptors.itime_default.value = 1
collector.sources.ExternalAvroSource.interceptors.iviber.type com.viber.bigdata.flume.ViberInterceptor$Builder
collector.sources.ExternalAvroSource.interceptors.iviber.file_types= FILE
collector.sources.ExternalAvroSource.interceptors.iviber.collector_id=1

collector.channels = filechannel-backup filechannel-s3raw
filechannel-s3prep-internal memorychannel-s3prep

collector.channels.filechannel-backup.type = memory
collector.channels.filechannel-backup.capacity = 1000000
collector.channels.filechannel-backup.transactionCapacity = 10000
#collector.channels.filechannel-backup.type = file
#collector.channels.filechannel-backup.checkpointDir=/disk3/flume_data/flume/collector1/channels/filechannel-backup/checkpoint
#collector.channels.filechannel-backup.dataDirs=/disk3/flume_data/flume/collector1/channels/filechannel-backup/data1,/disk3/flume_data/flume/collector1/channels/filechannel-backup/data2,/disk3/flume_data/flume/collector1/channels/filechannel-backup/data3,/disk3/flume_data/flume/collector1/channels/filechannel-backup/data4
#coll
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB