Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume, mail # user - Custom sink - "close() called when transaction is OPEN" error


Copy link to this message
-
Re: Custom sink - "close() called when transaction is OPEN" error
Roshan Naik 2012-11-15, 19:18
Brock.. I assume you mean "abort" when you say "destroy".. if so yes I too
think that closing should abort an uncommitted transaction. I had the same
thought when reading through the memory channel implementation.

-roshan

On Thu, Nov 15, 2012 at 4:50 AM, Brock Noland <[EMAIL PROTECTED]> wrote:

> Can you log the Throwable as the first thing in the catch block to see
> if something and what it is, is being thrown?
>
> Transactions are thread local so if for some reason the the sequencing
> gets messed up on an earlier call the process, every call to
> transaction will thrown an exception including begin.
>
>
> https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java
>
> As I stated in FLUME-1089 I think that when close is called it should
> forcefully destroy the transaction like JDBC close() but I have not
> got much agreement.
>
>
> On Thu, Nov 15, 2012 at 5:24 AM, Andrew Jones <[EMAIL PROTECTED]>
> wrote:
> > We are using Flume 1.2.0. We have a custom source, although it passes
> > through an Avro Sink and Source before getting to the sink. We are now
> using
> > the memory channel, although had just switched from the JDBC channel
> when we
> > started seeing these errors, so maybe that's something to do with it?
> >
> > I tried wrapping transaction.rollback(); in a try catch and logging in
> the
> > catch, but it wasn't called, so I don't think the rollback is throwing an
> > error.
> >
> > I think it may have something to do with switching channels, as right
> after
> > Flume reloaded the config we started getting errors. I have restarted the
> > flume node manually and we are still getting the error.
> >
> > Thanks,
> > Andrew
> >
> >
> > On 14 November 2012 20:02, Hari Shreedharan <[EMAIL PROTECTED]>
> > wrote:
> >>
> >> Which version of Flume are you using? It looks like the transaction was
> >> never rolled back or committed. It is likely that the rollback method
> too
> >> threw some exception, and the rollback was not successful. Also, what
> >> channel are you using?
> >>
> >>
> >> Thanks,
> >> Hari
> >>
> >> --
> >> Hari Shreedharan
> >>
> >> On Wednesday, November 14, 2012 at 8:55 AM, Andrew Jones wrote:
> >>
> >> Hi,
> >>
> >> I have a custom sink which has been working fine, but recently I have
> >> started seeing this error in the logs:
> >>
> >> Unable to deliver event. Exception follows.
> >> java.lang.IllegalStateException: close() called when transaction is
> OPEN -
> >> you must either commit or rollback first
> >>         at
> >> com.google.common.base.Preconditions.checkState(Preconditions.java:176)
> >> ...
> >>
> >>
> >> After having a google and finding
> >> https://issues.apache.org/jira/browse/FLUME-1089, I have double
> checked I am
> >> using the correct try, catch, finally idiom that other sinks use, and I
> seem
> >> to be doing the same. I do the following:
> >>
> >> public Status process() throws EventDeliveryException {
> >> Status status = Status.READY;
> >>
> >> Channel channel = getChannel();
> >> Transaction transaction = channel.getTransaction();
> >>
> >> try {
> >> transaction.begin();
> >>
> >>                         // does a bit of processing and
> >>                         // writes out the event to MongoDB
> >>
> >>                         transaction.commit();
> >>
> >> } catch (Throwable t) {
> >> transaction.rollback();
> >>
> >> if (t instanceof Error) {
> >> throw (Error) t;
> >> } else if  (t instanceof EventDeliveryException) {
> >> throw (EventDeliveryException) t;
> >> } else if (t instanceof ChannelException) {
> >> logger.error("Brodie Log Sink " + getName() + ": Unable to get event
> from"
> >> +
> >> " channel " + channel.getName() + ". Exception follows.", t);
> >> status = Status.BACKOFF;
> >> } else {
> >> throw new EventDeliveryException("Failed to send events", t);
> >> }
> >> } finally {
> >> transaction.close();
> >> }
> >>
> >> return status;
> >> }
> >>
> >> }
> >>
> >> All of this code came from looking at other sinks (Avro and HDFS), so I