Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume, mail # user - Avro sink to source is too slow


Copy link to this message
-
Re: Avro sink to source is too slow
Anat Rozenzon 2013-10-01, 12:41
Thank you Mike & Roshan.

I've changed both flumes(agent & collector) to run with memory channels, I
also removed the compression for now.
I sample the EventAcceptedCount metric on collector's avro source every 2
minutes and it seem that during two minutes it received 1,027,000 records
which is ~ 77MB.

It is a better throughput but still not what I expect.

This is my current agent config:

agent.sources = logsdir
agent.sources.logsdir.type = spooldir
agent.sources.logsdir.channels = fileChannel
agent.sources.logsdir.spoolDir = /disk/old_logs
agent.sources.logsdir.fileHeader = true
agent.sources.logsdir.batchSize=1000
agent.sources.logsdir.deletePolicy=immediate
agent.sources.logsdir.interceptors =  ihost iserver_type iserver_id
#agent.sources.logsdir.interceptors.ihost.type = host
#agent.sources.logsdir.interceptors.ihost.useIP = false
#agent.sources.logsdir.interceptors.ihost.hostHeader = server_hostname
agent.sources.logsdir.interceptors.ihost.type = static
agent.sources.logsdir.interceptors.ihost.key = server_hostname
agent.sources.logsdir.interceptors.ihost.value = hs666
agent.sources.logsdir.interceptors.iserver_type.type = static
agent.sources.logsdir.interceptors.iserver_type.key = server_type
agent.sources.logsdir.interceptors.iserver_type.value = Push
agent.sources.logsdir.interceptors.iserver_id.type = static
agent.sources.logsdir.interceptors.iserver_id.key = server_id
agent.sources.logsdir.interceptors.iserver_id.value = 1

agent.sources.logsdir.deserializer.maxLineLength = 10240
agent.channels = fileChannel
agent.channels.fileChannel.type = memory
agent.channels.fileChannel.capacity = 100000
agent.channels.fileChannel.transactionCapacity = 1000

#agent.channels.fileChannel.type = file
#agent.channels.fileChannel.checkpointDir=/mnt/flume/filechannel/checkpoint
#agent.channels.fileChannel.dataDirs=/mnt/flume/filechannel/data
#agent.channels.fileChannel.capacity=2000000
#agent.channels.fileChannel.transactionCapacity=1000
#agent.channels.fileChannel.use-fast-replay=true
#agent.channels.fileChannel.useDualCheckpoints=true
#agent.channels.fileChannel.backupCheckpointDir=/mnt/flume/filechannel/backupCheckpointDir
#agent.channels.fileChannel.minimumRequiredSpace=1073741824
#agent.channels.fileChannel.maxFileSize=524288000

agent.sinks = AvroSink1-1 AvroSink1-2 AvroSink1-3 AvroSink1-4

agent.sinks.AvroSink1-1.type = avro
agent.sinks.AvroSink1-1.channel = fileChannel
agent.sinks.AvroSink1-1.hostname = X.X.X.X
agent.sinks.AvroSink1-1.port = 45451
agent.sinks.AvroSink1-1.connect-timeout = 60000
agent.sinks.AvroSink1-1.request-timeout = 60000
agent.sinks.AvroSink1-1.batch-size = 1000
#agent.sinks.AvroSink1-1.compression-type=deflate
#agent.sinks.AvroSink1-1.compression-level=9

agent.sinks.AvroSink1-2.type = avro
agent.sinks.AvroSink1-2.channel = fileChannel
agent.sinks.AvroSink1-2.hostname = X.X.X.X
agent.sinks.AvroSink1-2.port = 45451
agent.sinks.AvroSink1-2.connect-timeout = 60000
agent.sinks.AvroSink1-2.request-timeout = 60000
agent.sinks.AvroSink1-2.batch-size = 1000
#agent.sinks.AvroSink1-2.compression-type=deflate
#agent.sinks.AvroSink1-2.compression-level=9

agent.sinks.AvroSink1-3.type = avro
agent.sinks.AvroSink1-3.channel = fileChannel
agent.sinks.AvroSink1-3.hostname = X.X.X.X
agent.sinks.AvroSink1-3.port = 45451
agent.sinks.AvroSink1-3.connect-timeout = 60000
agent.sinks.AvroSink1-3.request-timeout = 60000
agent.sinks.AvroSink1-3.batch-size = 1000
#agent.sinks.AvroSink1-3.compression-type=deflate
#agent.sinks.AvroSink1-3.compression-level=9

agent.sinks.AvroSink1-4.type = avro
agent.sinks.AvroSink1-4.channel = fileChannel
agent.sinks.AvroSink1-4.hostname = X.X.X.X
agent.sinks.AvroSink1-4.port = 45451
agent.sinks.AvroSink1-4.connect-timeout = 60000
agent.sinks.AvroSink1-4.request-timeout = 60000
agent.sinks.AvroSink1-4.batch-size = 1000
#agent.sinks.AvroSink1-4.compression-type=deflate
#agent.sinks.AvroSink1-4.compression-level=9

This is the relevant part of the collector config (in general 1 avro source
writes to 3 channels):

collector.sources = ExternalAvroSource

collector.sources.ExternalAvroSource.type = avro
collector.sources.ExternalAvroSource.bind = 0.0.0.0
collector.sources.ExternalAvroSource.port = 45451
#collector.sources.ExternalAvroSource.compression-type=deflate
#collector.sources.ExternalAvroSource.threads = 64
## Source writes to 3 channels, one for each sink (Fan Out)
collector.sources.ExternalAvroSource.channels = filechannel-backup
filechannel-s3raw filechannel-s3prep-internal
collector.sources.ExternalAvroSource.selector.type = replicating
collector.sources.ExternalAvroSource.interceptors = iviber itime_default
collector.sources.ExternalAvroSource.interceptors.itime_default.type static
collector.sources.ExternalAvroSource.interceptors.itime_default.preserveExisting
= true
collector.sources.ExternalAvroSource.interceptors.itime_default.key timestamp
collector.sources.ExternalAvroSource.interceptors.itime_default.value = 1
collector.sources.ExternalAvroSource.interceptors.iviber.type com.viber.bigdata.flume.ViberInterceptor$Builder
collector.sources.ExternalAvroSource.interceptors.iviber.file_types= FILE
collector.sources.ExternalAvroSource.interceptors.iviber.collector_id=1

collector.channels = filechannel-backup filechannel-s3raw
filechannel-s3prep-internal memorychannel-s3prep

collector.channels.filechannel-backup.type = memory
collector.channels.filechannel-backup.capacity = 1000000
collector.channels.filechannel-backup.transactionCapacity = 10000
#collector.channels.filechannel-backup.type = file
#collector.channels.filechannel-backup.checkpointDir=/disk3/flume_data/flume/collector1/channels/filechannel-backup/checkpoint
#collector.channels.filechannel-backup.dataDirs=/disk3/flume_data/flume/collector1/channels/filechannel-backup/data1,/disk3/flume_data/flume/collector1/channels/filechannel-backup/data2,/disk3/flume_data/flume/collector1/channels/filechannel-backup/data3,/disk3/flume_data/flume/collector1/channels/filechannel-backup/data4
#coll