Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume >> mail # user >> Lock contention in FileChannel

Copy link to this message
Re: Lock contention in FileChannel
Since the channel is designed to make sure that events are not duplicated to multiple sinks, and to protect against corruption due to concurrency issues, we do not need the locking in the channel's flume event queue. It is unlikely that locking is what is causing performance issues because the channel is heavily I/O bound. If you take a series of thread dumps, you will probably see that those threads are moving forward and the ones reading/writing from/to disk are the ones which are slower. These locks are unlikely to hit performance much.

On Tuesday, August 13, 2013 at 4:13 PM, Pankaj Gupta wrote:

> Hi,
> Spent some more time debugging issues with FileChannel. The problem seems to lock contention reading from FlumeEventQueue:
> I see a lot of threads like this:
> "SinkRunner-PollingRunner-LoadBalancingSinkProcessor" prio=10 tid=0x00007f857b378800 nid=0x404d waiting for monitor entry [0x00007f821afee000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at org.apache.flume.channel.file.FlumeEventQueue.removeHead(FlumeEventQueue.java:117)
>         - waiting to lock <0x0000000518ee4c90> (a org.apache.flume.channel.file.FlumeEventQueue)
>         at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doTake(FileChannel.java:492)
>         at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>         at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
>         at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:330)
>         at org.apache.flume.sink.LoadBalancingSinkProcessor.process(LoadBalancingSinkProcessor.java:154)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:662)
> I have two file channels and 8 Avro Sinks per file channel. I added more sinks because they weren't draining fast enough. It seems like they send the batch then wait for an ack before sending again, thus sends are not pipelined and having more sinks seemed like a good way of getting some parallelism.
> Here's the full stack trace:
> 2013-08-13 15:30:32
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (20.13-b02 mixed mode):
> "Flume Avro RPC Client Call Invoker 1" daemon prio=10 tid=0x00007f834c02c000 nid=0x41a2 waiting on condition [0x00007f8210341000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x000000056d932120> (a java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
> at org.apache.avro.ipc.CallFuture.await(CallFuture.java:141)
> at org.apache.avro.ipc.Requestor.request(Requestor.java:150)
> at org.apache.avro.ipc.Requestor.request(Requestor.java:129)
> at org.apache.avro.ipc.specific.SpecificRequestor.invoke(SpecificRequestor.java:68)
> at $Proxy7.appendBatch(Unknown Source)
> at org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:314)
> at org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:310)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> "Attach Listener" daemon prio=10 tid=0x00007f85fc016000 nid=0x41a1 waiting on condition [0x0000000000000000]