Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume, mail # dev - Review Request 15779: Flume-2245 HDFS Sink BucketWriter failing to close after datanode issues


Copy link to this message
-
Re: Review Request 15779: Flume-2245 HDFS Sink BucketWriter failing to close after datanode issues
Juhani Connolly 2013-11-26, 02:24


> On Nov. 25, 2013, 8:07 p.m., Hari Shreedharan wrote:
> > Juhani,
> >
> > I don't mind the idea of this patch, but I am not sure this works. If the flush fails, we ignore the failure and we set isOpen to false, we are not going to try to close the file again (barring retries which is possible in newer versions of hadoop and flume). If we don't try to close or flush again, the data could still be stuck in the local buffers and we risk data loss. The real issue is that there is some underlying failure in HDFS, which should be handled.
> >
> > I think the real solution is to retry closes, which we added in Flume a few months back, provided that version of HDFS supports it. Does that make sense?

If you look at the logs I attached to the JIRA you will note that certain lines are missing: for example the output from

LOG.warn("failed to close() HDFSWriter for file (" + bucketPath + "). Exception follows.", e);

I'm pretty confident this is because the IOException is occuring in the flush() at the beginning of BucketWriter.close(). The close is called after the writer.append() fails. This also fails with the appropriate logs getting put out. There is no retry logic surrounding this, so the exception occurs but none of the retry logic is entered(the retry logic comes in after the flush)

I've annotated the most relevant code in BucketWriter.append() and close() these  are from the version we had installed. I compared to the most recent source and based on the log output it's not entering the code path where the close retries occur.

    // write the event
    try {
      sinkCounter.incrementEventDrainAttemptCount();
      callWithTimeout(new CallRunner<Void>() {
        @Override
        public Void call() throws Exception {
          writer.append(event); // could block  <-- this throws the exception
          return null;
        }
      });
    } catch (IOException e) {
      LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
          bucketPath + ") and rethrowing exception.",
          e.getMessage());
      try {
        close();                           <-- this fails at the flush returning the exception cause in the executor thread
      } catch (IOException e2) {
        LOG.warn("Caught IOException while closing file (" +
             bucketPath + "). Exception follows.", e2);
      }
      throw e;
    }

=========================================================
  public synchronized void close() throws IOException, InterruptedException {
    checkAndThrowInterruptedException();
    flush();   <-- this is where the failure occurs
    LOG.debug("Closing {}", bucketPath);
    if (isOpen) {
      try {                     <-------- beyond here is where newer versions add in the close retries. If they don't get past the flush that logic  doesn't get a chance to kick in
        callWithTimeout(new CallRunner<Void>() {
          @Override
          public Void call() throws Exception {
            writer.close(); // could block
            return null;
          }
        });
- Juhani
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15779/#review29389
-----------------------------------------------------------
On Nov. 22, 2013, 8:38 a.m., Juhani Connolly wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15779/
> -----------------------------------------------------------
>
> (Updated Nov. 22, 2013, 8:38 a.m.)
>
>
> Review request for Flume.
>
>
> Repository: flume-git
>
>
> Description
> -------
>
> https://issues.apache.org/jira/browse/FLUME-2245
>
> Originally the flush() seemed superfluous however without it one of the unit tests breaks.
>
> By moving on beyond regardless of the flush succeeding or not we allow the backing stream to actually get closed and reopened. While the real problem is with the HDFS stream not recovering this workaround seems necessary as otherwise appends will continue to fail until a restart.