Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Flume >> mail # user >> Custom sink - "close() called when transaction is OPEN" error


+
Andrew Jones 2012-11-14, 16:55
+
Roshan Naik 2012-11-14, 19:57
+
Hari Shreedharan 2012-11-14, 20:02
+
Andrew Jones 2012-11-15, 11:24
+
Brock Noland 2012-11-15, 12:50
+
Roshan Naik 2012-11-15, 19:18
+
Andrew Jones 2012-11-16, 15:21
+
Brock Noland 2012-11-16, 15:32
+
Andrew Jones 2012-11-16, 15:45
Copy link to this message
-
Re: Custom sink - "close() called when transaction is OPEN" error
If channel.take() returns null, no commit or rollback is called....

Checkout how logger sink handles this:

https://git-wip-us.apache.org/repos/asf?p=flume.git;a=blob;f=flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java;h=128fa8427af633c0c7c50093f8f6c4ef9bb8ae76;hb=HEAD

brock

On Fri, Nov 16, 2012 at 9:45 AM, Andrew Jones <[EMAIL PROTECTED]> wrote:
> Sure.
>
> Sink: http://pastebin.com/N6zh73hU
> Config: http://pastebin.com/Tc2MH9iV
>
> Thanks.
>
>
> On 16 November 2012 15:32, Brock Noland <[EMAIL PROTECTED]> wrote:
>>
>> Would you be able to send the source of your sink via pastbin in
>> addition to your config?
>>
>> On Fri, Nov 16, 2012 at 9:21 AM, Andrew Jones <[EMAIL PROTECTED]>
>> wrote:
>> > I tried logging the first throwable, but now that is just the
>> > IllegalStateException.
>> >
>> > Today I have been looking at Flume-1.3.0rc3 and I have noticed the same
>> > problem. This is using the Avro source, File channel and our custom
>> > sink.
>> > After Flume reloads its config, the first error message comes when the
>> > Avro
>> > source starts up:
>> >
>> > 16 Nov 2012 16:04:25,237 INFO  [lifecycleSupervisor-1-4]
>> > (org.apache.flume.source.AvroSource.start:142)  - Starting Avro source
>> > source: { bindAddress: 0.0.0.0, port: 36060 }...
>> > 16 Nov 2012 16:04:25,484 ERROR
>> > [SinkRunner-PollingRunner-DefaultSinkProcessor]
>> > (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
>> > event. Exception follows.
>> > java.lang.IllegalStateException: close() called when transaction is OPEN
>> > -
>> > you must either commit or rollback first
>> >         at
>> > com.google.common.base.Preconditions.checkState(Preconditions.java:176)
>> >         at
>> >
>> > org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>> >         at
>> >
>> > com.arm.pd.brodie.flume.sink.ResultSink.processSingle(ResultSink.java:440)
>> >         at
>> > com.arm.pd.brodie.flume.sink.ResultSink.process(ResultSink.java:172)
>> >         at
>> >
>> > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>> >         at
>> > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>> >         at java.lang.Thread.run(Thread.java:636)
>> > 16 Nov 2012 16:04:25,594 INFO  [lifecycleSupervisor-1-4]
>> > (org.apache.flume.instrumentation.MonitoredCounterGroup.register:89)  -
>> > Monitoried counter group for type: SOURCE, name: source, registered
>> > successfully.
>> > 16 Nov 2012 16:04:25,595 INFO  [lifecycleSupervisor-1-4]
>> > (org.apache.flume.instrumentation.MonitoredCounterGroup.start:73)  -
>> > Component type: SOURCE, name: source started
>> > 16 Nov 2012 16:04:25,595 INFO  [lifecycleSupervisor-1-4]
>> > (org.apache.flume.source.AvroSource.start:168)  - Avro source source
>> > started.
>> >
>> > I then continually get errors from the Sink, presumably as its been
>> > called
>> > periodically to check for events in the channel. So is it possible its
>> > the
>> > Avro source causing the issue?
>> >
>> > There should have been nothing persisted in the file channel when
>> > restarting.
>> >
>> > When the transaction gets messed up like this, is there a way to refresh
>> > it,
>> > preferably without losing any data?
>> >
>> > I am still able to send things to flume and they get processed and
>> > inserted
>> > by my sink, so it still seems to work OK.
>> >
>> > Thanks,
>> > Andrew
>> >
>> >
>> >
>> > On 15 November 2012 12:50, Brock Noland <[EMAIL PROTECTED]> wrote:
>> >>
>> >> Can you log the Throwable as the first thing in the catch block to see
>> >> if something and what it is, is being thrown?
>> >>
>> >> Transactions are thread local so if for some reason the the sequencing
>> >> gets messed up on an earlier call the process, every call to
>> >> transaction will thrown an exception including begin.
>> >>
>> >>
>> >>
>> >> https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java

Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/
+
Andrew Jones 2012-11-16, 16:17
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB