Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume, mail # user - Avro sink to source is too slow


Copy link to this message
-
Re: Avro sink to source is too slow
Mike Keane 2013-09-30, 20:02
As far as a fast disk if you only have one the drive head will be seeking constantly and performance will be awful we were having problems at 10,000 log lines per second.  I've pushed over 270,000 lines per second compressed.

I don't think it is avro, I'm able to saturate a gigabit line easily, so ~100mb / second of compressed data.

I don't see a sink group in your configuration, I'm curious as to what the default behavior is when you tie multiple sinks to a file channel without a sink group.  That said I found performance issues using a single file channel with compression.  To get maximum performance I put a header on my events called "channel" since our servers are all numbered I was able to take (server# mod 6)+1 and make that the value for the "channel" header thus getting fairly even distribution of log data.  On my source I send data by channel header to the appropriate channel.  This parallelized the compression down 6 file channels.  I then have 3 sinks per channel using a failover sink group.   Also, do you need compression level 9?  I've found the gains in higher compression level are negligable compared to the performance expense (not with flume/deflate specifically but in general).  I found with turning compression level to 1 caused my sink to run 6-7 times slower, my solution was to parallelize the compression and by trial and error found this to be the best case.

agentName.sources.collector_source.selector.type = multiplexing
agentName.sources.collector_source.selector.header = channel
agentName.sources.collector_source.selector.mapping.1 = channel_1
agentName.sources.collector_source.selector.mapping.2 = channel_2
agentName.sources.collector_source.selector.mapping.3 = channel_3
agentName.sources.collector_source.selector.mapping.4 = channel_4
agentName.sources.collector_source.selector.mapping.5 = channel_5
agentName.sources.collector_source.selector.default = channel_6
-Mike

On 09/30/2013 02:30 PM, Anat Rozenzon wrote:
AFAIK we have a fast disk
However I think  the problem is with avro and not the channel as you can see in the metrics below the channel got filled quickly but draining very slowly.
After a few minutes of running only 70-80 batches were sent by each sink.
{
"SINK.AvroSink1-4":{"BatchCompleteCount":"74","ConnectionFailedCount":"0","EventDrainAttemptCount":"74000","ConnectionCreatedCount":"3","Type":"SINK","BatchEmptyCount":"1","ConnectionClosedCount":"2","EventDrainSuccessCount":"71000","StopTime":"0","StartTime":"1380568140738","BatchUnderflowCount":"0"},
"SOURCE.logsdir":{"OpenConnectionCount":"0","Type":"SOURCE","AppendBatchAcceptedCount":"1330","AppendBatchReceivedCount":"1330","EventAcceptedCount":"1326298","AppendReceivedCount":"0","StopTime":"0","StartTime":"1380568140830","EventReceivedCount":"1326298","AppendAcceptedCount":"0"},
"CHANNEL.fileChannel":{"EventPutSuccessCount":"1326298","ChannelFillPercentage":"51.314899999999994","Type":"CHANNEL","StopTime":"0","EventPutAttemptCount":"1326298","ChannelSize":"1026298","StartTime":"1380568140730","EventTakeSuccessCount":"300000","ChannelCapacity":"2000000","EventTakeAttemptCount":"310073"},
"SINK.AvroSink1-2":{"BatchCompleteCount":"78","ConnectionFailedCount":"0","EventDrainAttemptCount":"78000","ConnectionCreatedCount":"3","Type":"SINK","BatchEmptyCount":"1","ConnectionClosedCount":"2","EventDrainSuccessCount":"75000","StopTime":"0","StartTime":"1380568140736","BatchUnderflowCount":"0"},
"SINK.AvroSink1-3":{"BatchCompleteCount":"81","ConnectionFailedCount":"0","EventDrainAttemptCount":"81000","ConnectionCreatedCount":"3","Type":"SINK","BatchEmptyCount":"1","ConnectionClosedCount":"2","EventDrainSuccessCount":"79000","StopTime":"0","StartTime":"1380568140736","BatchUnderflowCount":"0"},
"SINK.AvroSink1-1":{"BatchCompleteCount":"77","ConnectionFailedCount":"0","EventDrainAttemptCount":"77000","ConnectionCreatedCount":"2","Type":"SINK","BatchEmptyCount":"1","ConnectionClosedCount":"1","EventDrainSuccessCount":"75000","StopTime":"0","StartTime":"1380568140734","BatchUnderflowCount":"0"}}
On Mon, Sep 30, 2013 at 7:21 PM, Mike Keane <[EMAIL PROTECTED]<mailto:[EMAIL PROTECTED]>> wrote:
What kind of disk configuration on your file channel?  With a single disk configuration (Dell Blade server) performance was awful.  I believe what Flume needs at a minimum is a separate disk for the check point and data directories.  When I switched to a SSD or a 13 disk raid setup my problems went away with one exception.   Compression was still very slow.  I ended up distributing my flow over several file channels to get good throughput with compression.

-Mike
On 09/30/2013 11:11 AM, Anat Rozenzon wrote:
Hi

I'm trying to read 100MB of files using directory spooler, file channel and 4 avro sinks into an avro source running on another flume process.
Both flume processes are running on same machine just for eliminating network issues.

However it takes more than 5 minutes to read & pass the 100MB data, this is too slow for our needs.

After about 1 minute the files are read into the file channel and then quite a long time where the file channel is draining really slowly with the four sinks.

Copying the same data using scp from a remote machine takes 7 seconds.

Below is my config, anything I can do to improve this?

agent.sources = logsdir
agent.sources.logsdir.type = spooldir
agent.sources.logsdir.channels = fileChannel
agent.sources.logsdir.spoolDir = %%WORK_DIR%%
agent.sources.logsdir.fileHeader = true
agent.sources.logsdir.batchSize=1000
agent.sources.logsdir.deletePolicy=immediate
agent.sources.logsdir.interceptors =  ihost iserver_type iserver_id
agent.sources.logsdir.interceptors.ihost.type = host
agent.sources.logsdir.interceptors.ihost.useIP = false
agent.sources.logsdir.interceptors.ihost.hostHeader = server_hostname

agent.sources.logsdir.interceptors.iserver_type.type = static
agent.sources.logsdir.interceptors.iserver_type.key = server_type
agent.sources.logsdir.interceptors.iserver_type.value = %%SE