Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume >> mail # user >> Unable to put batch on required channel


Copy link to this message
-
Unable to put batch on required channel
Hello world,

I've been trying to set up a test instance of flume and have been stymied
by recurring failures. I'm trying to use a single flume agent moving about
200G of data from a spooldir into a very small hadoop cluster (3 nodes). If
flume is the only thing writing to HDFS, everything works fine, but as soon
as another application starts writing data into the cluster HDFS slows down
and flume barfs with an "unable to put batch on required channel" exception.

I have tried all kinds of configuration changes, to no avail. I have tried
memory channels, file channels, small batch sizes (down to 50), large batch
sizes (up to 20000), increasing timeouts, increasing channel capacity (up
to 150 million), you name it. Sooner or later (usually 5-10 minutes after
restart) flume comes to a halt. This is especially vexing considering that
it's copying from a file to a file--there are no realtime requirements that
might reasonably lead to a full channel in other circumstances. Anybody
have any advice? Insights? Wild guesses? Outright lies?

Below are two exceptions from the log, one from a memory channel
configuration, one from a file channel configuration, and below that is the
most recent configuration file used. Absolutely any suggestions would be
appreciated.

Thanks,
Cameron
25 Sep 2013 21:05:12,262 ERROR [pool-5-thread-1]
(org.apache.flume.source.SpoolDirectorySource$Spool
DirectoryRunnable.run:195)  - FATAL: Spool Directory source r1: { spoolDir:
/var/nrelate/flume-spool
 }: Uncaught exception in SpoolDirectorySource thread. Restart or
reconfigure Flume to continue proc
essing.
org.apache.flume.ChannelException: Unable to put batch on required channel:
org.apache.flume.channel
.MemoryChannel{name: c1}
at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at
org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySou
rce.java:189)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Scheduled
ThreadPoolExecutor.java:165)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadP
oolExecutor.java:267)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)
Caused by: org.apache.flume.ChannelException: Space for commit to queue
couldn't be acquired Sinks a
re likely not keeping up with sources, or the buffer size is too tight
at
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:128)
at
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:
151)
at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
... 9 more
25 Sep 2013 22:18:37,672 ERROR [pool-5-thread-1]
(org.apache.flume.source.SpoolDirectorySource$Spool
DirectoryRunnable.run:195)  - FATAL: Spool Directory source r1: { spoolDir:
/var/nrelate/flume-spool
 }: Uncaught exception in SpoolDirectorySource thread. Restart or
reconfigure Flume to continue proc
essing.
org.apache.flume.ChannelException: Unable to put batch on required channel:
FileChannel c1 { dataDir
s: [/var/lib/flume-ng/.flume/file-channel/data] }
at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at
org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySou
rce.java:189)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Scheduled
ThreadPoolExecutor.java:165)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadP
oolExecutor.java:267)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)
Caused by: org.apache.flume.ChannelException: The channel has reached it's
capacity. This might be t
he result of a sink on the channel having too low of batch size, a
downstream system running slower
than normal, or that the channel capacity is just too low. [channel=c1]
at
org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:46
8)
at
org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
at
org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)
... 9 more
# define the pipeline parts

agent.sources = r1
agent.sinks = k1
agent.channels = c1

agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1

# the main source, a spooldir

agent.sources.r1.type = spooldir
agent.sources.r1.spoolDir = /var/intheworld/flume-spool
agent.sources.r1.batchSize = 10000
agent.sources.r1.deserializer.maxLineLength = 10000
agent.sources.r1.interceptors = i1 i2

# parse out the timestamp and add to header
agent.sources.r1.interceptors.i1.type = regex_extractor
agent.sources.r1.interceptors.i1.regex = ^.*\\"ts\\":(\\d+).*$
agent.sources.r1.interceptors.i1.serializers = s1
agent.sources.r1.interceptors.i1.serializers.s1.name = timestamp

# also set host (hostname doesn't work properly, so set explicitly)
agent.sources.r1.interceptors.i2.type = static
agent.sources.r1.interceptors.i2.key = host
agent.sources.r1.interceptors.i2.value = Ess003726

# the sink, HDFS

agent.sinks.k1.type = hdfs
agent.sinks.k1.hdfs.path = hdfs://
a.h