Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume >> mail # user >> Flume configuration fail-over problems


Copy link to this message
-
Flume configuration fail-over problems
Hi

I am trying to setup a configuration where a single source agent load
balances between two aggregate agents which multiplex the flow to two end
points. I don't think I have the channel capacity properly configured but
flume always seems to end up hanging for me. If all channels are at
capacity should the source try and send again once the they have emptied?

Here is my configuration.

#
# Properties of memoryChannel
#
local_agent.channels.memoryChannel-1.type = memory
local_agent.channels.memoryChannel-1.capacity = 100000
local_agent.channels.memoryChannel-1.transactionCapacity = 1000

collector_agent_1.channels.memoryChannel-1.type = memory
collector_agent_1.channels.memoryChannel-1.capacity = 100000
collector_agent_1.channels.memoryChannel-1.transactionCapacity = 1000

collector_agent_1.channels.memoryChannel-2.type = memory
collector_agent_1.channels.memoryChannel-2.capacity = 100000
collector_agent_1.channels.memoryChannel-2.transactionCapacity = 1000

collector_agent_2.channels.memoryChannel-1.type = memory
collector_agent_2.channels.memoryChannel-1.capacity = 100000
collector_agent_2.channels.memoryChannel-1.transactionCapacity = 1000

collector_agent_2.channels.memoryChannel-2.type = memory
collector_agent_2.channels.memoryChannel-2.capacity = 100000
collector_agent_2.channels.memoryChannel-2.transactionCapacity = 1000

#
# Properties for spooling directory source
#
local_agent.sources.spooldir-1.type = spooldir
local_agent.sources.spooldir-1.spoolDir = ~/flume_test/ready
local_agent.sources.spooldir-1.fileHeader = true
local_agent.sources.spooldir-1.channels = memoryChannel-1

#
# Properties for the avro sink 1 agent to collector 1
#
local_agent.sinks.avroSink-1.type = avro
local_agent.sinks.avroSink-1.hostname = 127.0.0.1
local_agent.sinks.avroSink-1.port = 4545
local_agent.sinks.avroSink-1.channel = memoryChannel-1

#
# Properties for the avro sink agent to collector 2
#
local_agent.sinks.avroSink-2.type = avro
local_agent.sinks.avroSink-2.hostname = 127.0.0.1
local_agent.sinks.avroSink-2.port = 4546
local_agent.sinks.avroSink-2.channel = memoryChannel-1

#
# Properties for the avro source collector 1
#
collector_agent_1.sources.avroSource-1.type = avro
collector_agent_1.sources.avroSource-1.bind = 127.0.0.1
collector_agent_1.sources.avroSource-1.port = 4545
collector_agent_1.sources.avroSource-1.channels = memoryChannel-1
memoryChannel-2

#
# Properties for the avro source collector 2
#
collector_agent_2.sources.avroSource-2.type = avro
collector_agent_2.sources.avroSource-2.bind = 127.0.0.1
collector_agent_2.sources.avroSource-2.port = 4546
collector_agent_2.sources.avroSource-2.channels = memoryChannel-1
memoryChannel-2

# End points for collector 1

# ElasticSearch endpoint collector 1

collector_agent_1.sinks.elastic-search-sink-1.type org.apache.flume.sink.elasticsearch.ElasticSearchSink
collector_agent_1.sinks.elastic-search-sink-1.hostNames = 127.0.0.1:9300
collector_agent_1.sinks.elastic-search-sink-1.clusterName = elasticsearch
collector_agent_1.sinks.elastic-search-sink-1.batchSize = 10
collector_agent_1.sinks.elastic-search-sink-1.channel = memoryChannel-1

# HDFS endpoint collector 1

collector_agent_1.sinks.sink1.type = hdfs
collector_agent_1.sinks.sink1.hdfs.path = hdfs://
hadoop-name-node1.dc1.ci-mang.van.dev.net:8020/flumeng_test
collector_agent_1.sinks.sink1.hdfs.fileType = DataStream
collector_agent_1.sinks.sink1.hdfs.rollInterval = 300
collector_agent_1.sinks.sink1.hdfs.rollSize = 0
collector_agent_1.sinks.sink1.hdfs.rollCount = 0
collector_agent_1.sinks.sink1.hdfs.batchSize = 1000
collector_agent_1.sinks.sink1.txnEventMax = 1000
collector_agent_1.sinks.sink1.serializer = avro_event
collector_agent_1.sinks.sink1.channel = memoryChannel-2

# ElasticSearch endpoint collector 2

collector_agent_2.sinks.elastic-search-sink-1.type org.apache.flume.sink.elasticsearch.ElasticSearchSink
collector_agent_2.sinks.elastic-search-sink-1.hostNames = 127.0.0.1:9300
collector_agent_2.sinks.elastic-search-sink-1.clusterName = elasticsearch
collector_agent_2.sinks.elastic-search-sink-1.batchSize = 10
collector_agent_2.sinks.elastic-search-sink-1.channel = memoryChannel-1

# HDFS endpoint collector 2

collector_agent_2.sinks.sink1.type = hdfs
collector_agent_2.sinks.sink1.hdfs.path = hdfs://
hadoop-name-node1.dc1.ci-mang.van.dev.net:8020/flumeng_test_3
collector_agent_2.sinks.sink1.hdfs.fileType = DataStream
collector_agent_2.sinks.sink1.hdfs.rollInterval = 300
collector_agent_2.sinks.sink1.hdfs.rollSize = 0
collector_agent_2.sinks.sink1.hdfs.rollCount = 0
collector_agent_2.sinks.sink1.hdfs.batchSize = 1000
collector_agent_2.sinks.sink1.txnEventMax = 1000
collector_agent_2.sinks.sink1.serializer = avro_event
collector_agent_2.sinks.sink1.channel = memoryChannel-2

# Specify priorities for the sinks on the agent

local_agent.sinkgroups.ha.sinks = avroSink-1 avroSink-2
local_agent.sinkgroups.ha.processor.type = failover
local_agent.sinkgroups.ha.priority.avroSink-1 = 2
local_agent.sinkgroups.ha.priority.avroSink-2 = 1

# Wire the source agents up

local_agent.sources = spooldir-1
local_agent.sinks = avroSink-1 avroSink-2
local_agent.sinkgroups = ha
local_agent.channels = memoryChannel-1

# Wire the collector agents up

collector_agent_1.sources = avroSource-1
collector_agent_1.sinks = elastic-search-sink-1 sink1
collector_agent_1.channels = memoryChannel-1 memoryChannel-2

collector_agent_2.sources = avroSource-2
collector_agent_2.sinks = elastic-search-sink-1 sink1
collector_agent_2.channels = memoryChannel-1 memoryChannel-2

I will get the following exceptions on the collector nodes

*org.apache.flume.ChannelException: Unable to put batch on required
channel: org.apache.flume.channel.MemoryChannel{name: memoryChannel-1}*
* at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
*
* at org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259)*
* at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)*
* at
sun.reflect.Deleg
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB