Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Flume, mail # dev - Re: git commit: FLUME-1572. Add batching support to FILE_ROLL sink.


Copy link to this message
-
Re: git commit: FLUME-1572. Add batching support to FILE_ROLL sink.
Hari Shreedharan 2012-09-13, 01:06
Cherrypicked this to flume-1.3.0

Thanks,
Hari
--
Hari Shreedharan
On Wednesday, September 12, 2012 at 5:34 PM, [EMAIL PROTECTED] wrote:

> Updated Branches:
> refs/heads/trunk 960d03d8a -> 46659c715
>
>
> FLUME-1572. Add batching support to FILE_ROLL sink.
>
> (Hari Shreedharan via Mike Percy)
>
>
> Project: http://git-wip-us.apache.org/repos/asf/flume/repo
> Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/46659c71
> Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/46659c71
> Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/46659c71
>
> Branch: refs/heads/trunk
> Commit: 46659c7156565778068c012f07301aab426d0728
> Parents: 960d03d
> Author: Mike Percy <[EMAIL PROTECTED] (mailto:[EMAIL PROTECTED])>
> Authored: Wed Sep 12 17:33:16 2012 -0700
> Committer: Mike Percy <[EMAIL PROTECTED] (mailto:[EMAIL PROTECTED])>
> Committed: Wed Sep 12 17:33:16 2012 -0700
>
> ----------------------------------------------------------------------
> .../org/apache/flume/sink/RollingFileSink.java | 47 ++++++++------
> .../org/apache/flume/sink/TestRollingFileSink.java | 3 +
> 2 files changed, 30 insertions(+), 20 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/flume/blob/46659c71/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
> ----------------------------------------------------------------------
> diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
> index e5e97ff..a94eea1 100644
> --- a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
> +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
> @@ -47,6 +47,9 @@ public class RollingFileSink extends AbstractSink implements Configurable {
> private static final Logger logger = LoggerFactory
> .getLogger(RollingFileSink.class);
> private static final long defaultRollInterval = 30;
> + private static final int defaultBatchSize = 100;
> +
> + private int batchSize = defaultBatchSize;
>
> private File directory;
> private long rollInterval;
> @@ -90,6 +93,8 @@ public class RollingFileSink extends AbstractSink implements Configurable {
> this.rollInterval = Long.parseLong(rollInterval);
> }
>
> + batchSize = context.getInteger("sink.batchSize", defaultBatchSize);
> +
> this.directory = new File(directory);
> }
>
> @@ -175,30 +180,32 @@ public class RollingFileSink extends AbstractSink implements Configurable {
>
> try {
> transaction.begin();
> - event = channel.take();
> -
> - if (event != null) {
> - serializer.write(event);
> -
> - /*
> - * FIXME: Feature: Rotate on size and time by checking bytes written and
> - * setting shouldRotate = true if we're past a threshold.
> - */
> -
> - /*
> - * FIXME: Feature: Control flush interval based on time or number of
> - * events. For now, we're super-conservative and flush on each write.
> - */
> - serializer.flush();
> - outputStream.flush();
> - } else {
> - // No events found, request back-off semantics from runner
> - result = Status.BACKOFF;
> + for (int i = 0; i < batchSize; i++) {
> + event = channel.take();
> + if (event != null) {
> + serializer.write(event);
> +
> + /*
> + * FIXME: Feature: Rotate on size and time by checking bytes written and
> + * setting shouldRotate = true if we're past a threshold.
> + */
> +
> + /*
> + * FIXME: Feature: Control flush interval based on time or number of
> + * events. For now, we're super-conservative and flush on each write.
> + */
> + } else {
> + // No events found, request back-off semantics from runner
> + result = Status.BACKOFF;
> + break;
> + }
> }
> + serializer.flush();
> + outputStream.flush();
> transaction.commit();
> } catch (Exception ex) {
> transaction.rollback();
> - throw new EventDeliveryException("Failed to process event: " + event, ex);
> + throw new EventDeliveryException("Failed to process transaction", ex);