Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume, mail # user - Custom Sink


Copy link to this message
-
Re: Custom Sink
Erik Bertrand 2013-03-11, 14:24
I wrote a custom TCP Sink recently and would be happy to share the code;
it's based off the Logger sink (Brock Noland gave me a helpful link to the
source:
https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java).
 My code is quite simple, pretty bare-bones at this point.  I'm planning to
make it more full-featured and robust, and will eventually put it out on
GitHub.  Msg me in the interim.

Erik

On Mon, Mar 11, 2013 at 2:06 AM, Hari Shreedharan <[EMAIL PROTECTED]
> wrote:

> HI Vivek,
>
> I cannot be sure of why that is happening. Channel.take() gets called even
> if there are no events in the channel. If the take() method returns null,
> then there are no events in the channel. You can use the Status.BACKOFF
> return value to tell the sink poller to not retry immediately. But
> eventually, the SinkRunner will poll the sink again. This is because the
> SinkRunner does not know the state of the channel, so by calling the
> process method, the sink can take events if they arrive. Generally, the
> sinks call Channel.take() and if an entire batch was non-empty it will
> return Status.READY, else (that is the batch is null), then return
> Status.BACKOFF. See the code from AvroSink as an example (I have taken out
> some error-handling and counter-handling stuff and added some comments):
>
>       transaction.begin();
>       for (int i = 0; i < client.getBatchSize(); i++) {
>         Event event = channel.take(); //Take an event from the channel
>
>         if (event == null) { //Channel returned null, did not have any
> more events.
>           break;
>         }
>         batch.add(event);
>       }
>
>       int size = batch.size();
>       int batchSize = client.getBatchSize();
>
>       if (size == 0) {           //The batch was empty, so backoff and try
> again later.
>         status = Status.BACKOFF;
>       } else {                   //Batch was not empty, don't backoff, try
> immediately after
>         client.appendBatch(batch);
>       }
>       transaction.commit();
>       transaction.close();
>
>       return status;
>
> I hope this helps. Another thing you could do is to take something like
> AvroSink/AbstractRpcSink and rip out all of the Avro/Rpc stuff and insert
> your logic into it without changing much of the channel/transaction stuff.
>
> Hope this helps.
>
>
> Hari
>
>
> On Sun, Mar 10, 2013 at 8:52 PM, Vikram Kulkarni <[EMAIL PROTECTED]>wrote:
>
>> I am trying to write a custom sink for flume-ng. I looked at the existing
>> sinks and documentation and coded it up. However, the 'process()' method
>> that's supposed to receive the events always ends up with a null event. I
>> am doing Event event = channel.take(); but the event is null. I see in the
>> logs that this method is called repeatedly as the event is still in the
>> channel so I think it is reaching the sink but unable to take it out of the
>> channel.
>>
>> Can someone point me in the right direction?
>>
>> Thanks,
>>
>> Vikram
>>
>>
>>
>