Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume >> mail # user >> Custom sink - "close() called when transaction is OPEN" error


Copy link to this message
-
Re: Custom sink - "close() called when transaction is OPEN" error
Brock.. I assume you mean "abort" when you say "destroy".. if so yes I too
think that closing should abort an uncommitted transaction. I had the same
thought when reading through the memory channel implementation.

-roshan

On Thu, Nov 15, 2012 at 4:50 AM, Brock Noland <[EMAIL PROTECTED]> wrote:

> Can you log the Throwable as the first thing in the catch block to see
> if something and what it is, is being thrown?
>
> Transactions are thread local so if for some reason the the sequencing
> gets messed up on an earlier call the process, every call to
> transaction will thrown an exception including begin.
>
>
> https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java
>
> As I stated in FLUME-1089 I think that when close is called it should
> forcefully destroy the transaction like JDBC close() but I have not
> got much agreement.
>
>
> On Thu, Nov 15, 2012 at 5:24 AM, Andrew Jones <[EMAIL PROTECTED]>
> wrote:
> > We are using Flume 1.2.0. We have a custom source, although it passes
> > through an Avro Sink and Source before getting to the sink. We are now
> using
> > the memory channel, although had just switched from the JDBC channel
> when we
> > started seeing these errors, so maybe that's something to do with it?
> >
> > I tried wrapping transaction.rollback(); in a try catch and logging in
> the
> > catch, but it wasn't called, so I don't think the rollback is throwing an
> > error.
> >
> > I think it may have something to do with switching channels, as right
> after
> > Flume reloaded the config we started getting errors. I have restarted the
> > flume node manually and we are still getting the error.
> >
> > Thanks,
> > Andrew
> >
> >
> > On 14 November 2012 20:02, Hari Shreedharan <[EMAIL PROTECTED]>
> > wrote:
> >>
> >> Which version of Flume are you using? It looks like the transaction was
> >> never rolled back or committed. It is likely that the rollback method
> too
> >> threw some exception, and the rollback was not successful. Also, what
> >> channel are you using?
> >>
> >>
> >> Thanks,
> >> Hari
> >>
> >> --
> >> Hari Shreedharan
> >>
> >> On Wednesday, November 14, 2012 at 8:55 AM, Andrew Jones wrote:
> >>
> >> Hi,
> >>
> >> I have a custom sink which has been working fine, but recently I have
> >> started seeing this error in the logs:
> >>
> >> Unable to deliver event. Exception follows.
> >> java.lang.IllegalStateException: close() called when transaction is
> OPEN -
> >> you must either commit or rollback first
> >>         at
> >> com.google.common.base.Preconditions.checkState(Preconditions.java:176)
> >> ...
> >>
> >>
> >> After having a google and finding
> >> https://issues.apache.org/jira/browse/FLUME-1089, I have double
> checked I am
> >> using the correct try, catch, finally idiom that other sinks use, and I
> seem
> >> to be doing the same. I do the following:
> >>
> >> public Status process() throws EventDeliveryException {
> >> Status status = Status.READY;
> >>
> >> Channel channel = getChannel();
> >> Transaction transaction = channel.getTransaction();
> >>
> >> try {
> >> transaction.begin();
> >>
> >>                         // does a bit of processing and
> >>                         // writes out the event to MongoDB
> >>
> >>                         transaction.commit();
> >>
> >> } catch (Throwable t) {
> >> transaction.rollback();
> >>
> >> if (t instanceof Error) {
> >> throw (Error) t;
> >> } else if  (t instanceof EventDeliveryException) {
> >> throw (EventDeliveryException) t;
> >> } else if (t instanceof ChannelException) {
> >> logger.error("Brodie Log Sink " + getName() + ": Unable to get event
> from"
> >> +
> >> " channel " + channel.getName() + ". Exception follows.", t);
> >> status = Status.BACKOFF;
> >> } else {
> >> throw new EventDeliveryException("Failed to send events", t);
> >> }
> >> } finally {
> >> transaction.close();
> >> }
> >>
> >> return status;
> >> }
> >>
> >> }
> >>
> >> All of this code came from looking at other sinks (Avro and HDFS), so I
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB