|
|
-
Flume configuration fail-over problemsCameron Gandevia 2012-10-16, 18:23
Hi
I am trying to setup a configuration where a single source agent load balances between two aggregate agents which multiplex the flow to two end points. I don't think I have the channel capacity properly configured but flume always seems to end up hanging for me. If all channels are at capacity should the source try and send again once the they have emptied? Here is my configuration. # # Properties of memoryChannel # local_agent.channels.memoryChannel-1.type = memory local_agent.channels.memoryChannel-1.capacity = 100000 local_agent.channels.memoryChannel-1.transactionCapacity = 1000 collector_agent_1.channels.memoryChannel-1.type = memory collector_agent_1.channels.memoryChannel-1.capacity = 100000 collector_agent_1.channels.memoryChannel-1.transactionCapacity = 1000 collector_agent_1.channels.memoryChannel-2.type = memory collector_agent_1.channels.memoryChannel-2.capacity = 100000 collector_agent_1.channels.memoryChannel-2.transactionCapacity = 1000 collector_agent_2.channels.memoryChannel-1.type = memory collector_agent_2.channels.memoryChannel-1.capacity = 100000 collector_agent_2.channels.memoryChannel-1.transactionCapacity = 1000 collector_agent_2.channels.memoryChannel-2.type = memory collector_agent_2.channels.memoryChannel-2.capacity = 100000 collector_agent_2.channels.memoryChannel-2.transactionCapacity = 1000 # # Properties for spooling directory source # local_agent.sources.spooldir-1.type = spooldir local_agent.sources.spooldir-1.spoolDir = ~/flume_test/ready local_agent.sources.spooldir-1.fileHeader = true local_agent.sources.spooldir-1.channels = memoryChannel-1 # # Properties for the avro sink 1 agent to collector 1 # local_agent.sinks.avroSink-1.type = avro local_agent.sinks.avroSink-1.hostname = 127.0.0.1 local_agent.sinks.avroSink-1.port = 4545 local_agent.sinks.avroSink-1.channel = memoryChannel-1 # # Properties for the avro sink agent to collector 2 # local_agent.sinks.avroSink-2.type = avro local_agent.sinks.avroSink-2.hostname = 127.0.0.1 local_agent.sinks.avroSink-2.port = 4546 local_agent.sinks.avroSink-2.channel = memoryChannel-1 # # Properties for the avro source collector 1 # collector_agent_1.sources.avroSource-1.type = avro collector_agent_1.sources.avroSource-1.bind = 127.0.0.1 collector_agent_1.sources.avroSource-1.port = 4545 collector_agent_1.sources.avroSource-1.channels = memoryChannel-1 memoryChannel-2 # # Properties for the avro source collector 2 # collector_agent_2.sources.avroSource-2.type = avro collector_agent_2.sources.avroSource-2.bind = 127.0.0.1 collector_agent_2.sources.avroSource-2.port = 4546 collector_agent_2.sources.avroSource-2.channels = memoryChannel-1 memoryChannel-2 # End points for collector 1 # ElasticSearch endpoint collector 1 collector_agent_1.sinks.elastic-search-sink-1.type org.apache.flume.sink.elasticsearch.ElasticSearchSink collector_agent_1.sinks.elastic-search-sink-1.hostNames = 127.0.0.1:9300 collector_agent_1.sinks.elastic-search-sink-1.clusterName = elasticsearch collector_agent_1.sinks.elastic-search-sink-1.batchSize = 10 collector_agent_1.sinks.elastic-search-sink-1.channel = memoryChannel-1 # HDFS endpoint collector 1 collector_agent_1.sinks.sink1.type = hdfs collector_agent_1.sinks.sink1.hdfs.path = hdfs:// hadoop-name-node1.dc1.ci-mang.van.dev.net:8020/flumeng_test collector_agent_1.sinks.sink1.hdfs.fileType = DataStream collector_agent_1.sinks.sink1.hdfs.rollInterval = 300 collector_agent_1.sinks.sink1.hdfs.rollSize = 0 collector_agent_1.sinks.sink1.hdfs.rollCount = 0 collector_agent_1.sinks.sink1.hdfs.batchSize = 1000 collector_agent_1.sinks.sink1.txnEventMax = 1000 collector_agent_1.sinks.sink1.serializer = avro_event collector_agent_1.sinks.sink1.channel = memoryChannel-2 # ElasticSearch endpoint collector 2 collector_agent_2.sinks.elastic-search-sink-1.type org.apache.flume.sink.elasticsearch.ElasticSearchSink collector_agent_2.sinks.elastic-search-sink-1.hostNames = 127.0.0.1:9300 collector_agent_2.sinks.elastic-search-sink-1.clusterName = elasticsearch collector_agent_2.sinks.elastic-search-sink-1.batchSize = 10 collector_agent_2.sinks.elastic-search-sink-1.channel = memoryChannel-1 # HDFS endpoint collector 2 collector_agent_2.sinks.sink1.type = hdfs collector_agent_2.sinks.sink1.hdfs.path = hdfs:// hadoop-name-node1.dc1.ci-mang.van.dev.net:8020/flumeng_test_3 collector_agent_2.sinks.sink1.hdfs.fileType = DataStream collector_agent_2.sinks.sink1.hdfs.rollInterval = 300 collector_agent_2.sinks.sink1.hdfs.rollSize = 0 collector_agent_2.sinks.sink1.hdfs.rollCount = 0 collector_agent_2.sinks.sink1.hdfs.batchSize = 1000 collector_agent_2.sinks.sink1.txnEventMax = 1000 collector_agent_2.sinks.sink1.serializer = avro_event collector_agent_2.sinks.sink1.channel = memoryChannel-2 # Specify priorities for the sinks on the agent local_agent.sinkgroups.ha.sinks = avroSink-1 avroSink-2 local_agent.sinkgroups.ha.processor.type = failover local_agent.sinkgroups.ha.priority.avroSink-1 = 2 local_agent.sinkgroups.ha.priority.avroSink-2 = 1 # Wire the source agents up local_agent.sources = spooldir-1 local_agent.sinks = avroSink-1 avroSink-2 local_agent.sinkgroups = ha local_agent.channels = memoryChannel-1 # Wire the collector agents up collector_agent_1.sources = avroSource-1 collector_agent_1.sinks = elastic-search-sink-1 sink1 collector_agent_1.channels = memoryChannel-1 memoryChannel-2 collector_agent_2.sources = avroSource-2 collector_agent_2.sinks = elastic-search-sink-1 sink1 collector_agent_2.channels = memoryChannel-1 memoryChannel-2 I will get the following exceptions on the collector nodes *org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel{name: memoryChannel-1}* * at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200) * * at org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259)* * at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)* * at sun.reflect.Deleg |