|
|
-
flume-ng cassandra sink problem..
Priyanka jain 2012-12-26, 17:39
HI,
I am working on the POC of Cassandra flume integration. For that am using Cassandra sink plugin from *Github (flume-ng Cassandra sink plugin).* *And * *Flume-NG version-1.2.0* *Apache Cassandra Version :1.1.5* *I *have build the jar using maven and am using sink configuration as below in flume.conf.cassandra in conf directory...
*agent.sources = avrosource*
*agent.channels = channel1*
*agent.sinks = cassandraSink*
* *
*#source defination*
*agent.sources.avrosource.channels = channel1*
*agent.sources.avrosource.type = exec*
*agent.sources.avrosource.command = tail -f /home/user/priyanka/flume-ng/flnginput.txt*
* *
*#agent.sources.avrosource.type = avro*
*#agent.sources.avrosource.channels = channel1*
*#agent.sources.avrosource.bind =127.0.0.1*
*#agent.sources.avrosource.port =41414*
* *
*#Flume header event*
*agent.sources.avrosource.interceptors = addHost*
*agent.sources.avrosource.interceptors.addHost.type org.apache.flume.interceptor.HostInterceptor$Builder*
*agent.sources.avrosource.interceptors.addHost.preserveExisting = false*
*agent.sources.avrosource.interceptors.addHost.useIP = false*
*agent.sources.avrosource.interceptors.addHost.hostHeader = host*
*agent.sources.avrosource.interceptors = addTimestamp*
*agent.sources.avrosource.interceptors.addTimestamp.type org.apache.flume.interceptor.TimestampInterceptor$Builder*
* *
*# Cassandra flow*
*agent.channels.channel1.type = FILE*
*agent.channels.channel1.checkpointDir = file-channel1/check*
*agent.channels.channel1.dataDirs = file-channel1/data*
* *
*agent.sinks.cassandraSink.channel = channel1*
*agent.sinks.cassandraSink.type com.btoddb.flume.sinks.cassandra.CassandraSink*
*agent.sinks.cassandraSink.hosts = localhost*
*agent.sinks.cassandraSink.port = 9160*
*agent.sinks.cassandraSink.keyspace-name = logs*
*agent.sinks.cassandraSink.records-colfam = records*
* *
* *
* *
*Am running this using the command :-*
* *
flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra -Dflume.root.logger=DEBUG,console Got the error while running :-
* *
2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)] exception while processing in Cassandra Sink
java.lang.IllegalArgumentException: Missing flume header attribute, 'key' - cannot process this event
at com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
at com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
atjava.lang.Thread.run(Thread.java:722)
2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to persist message
at com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
atjava.lang.Thread.run(Thread.java:722)
Caused by: java.lang.IllegalArgumentException: Missing flume header attribute, 'key' - cannot process this event
at com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
at com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
... 3 more
I got one solution as Key is Src+Key but am not getting how to configure it. So can any one please help me out to solve this problem. So
-
Re: flume-ng cassandra sink problem..
Nitin Pawar 2012-12-26, 17:52
from the README you need to have following things in conf
The Sink expects several flume event headers to be present:
- key - used (combined with src) to create the Cassandra row key. It should be generated by the application doing the logging - timestamp - timestamp of when the log occurred, not necessarily when the flume event is created - src - A logical source of the flume event. Could be host, but probably you will have many hosts for a source. A more likely candidate for source is the name of the application - host - the name of the host where the message was generated -
On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain <[EMAIL PROTECTED]>wrote:
> HI, > > I am working on the POC of Cassandra flume integration. For that am > using Cassandra sink plugin from *Github (flume-ng Cassandra sink plugin). > * > *And * > *Flume-NG version-1.2.0* > *Apache Cassandra Version :1.1.5* > *I *have build the jar using maven and am using sink configuration as > below in flume.conf.cassandra in conf directory... > > *agent.sources = avrosource* > > *agent.channels = channel1* > > *agent.sinks = cassandraSink* > > * * > > *#source defination* > > *agent.sources.avrosource.channels = channel1* > > *agent.sources.avrosource.type = exec* > > *agent.sources.avrosource.command = tail -f > /home/user/priyanka/flume-ng/flnginput.txt* > > * * > > *#agent.sources.avrosource.type = avro* > > *#agent.sources.avrosource.channels = channel1* > > *#agent.sources.avrosource.bind =127.0.0.1* > > *#agent.sources.avrosource.port =41414* > > * * > > *#Flume header event* > > *agent.sources.avrosource.interceptors = addHost* > > *agent.sources.avrosource.interceptors.addHost.type > org.apache.flume.interceptor.HostInterceptor$Builder* > > *agent.sources.avrosource.interceptors.addHost.preserveExisting = false* > > *agent.sources.avrosource.interceptors.addHost.useIP = false* > > *agent.sources.avrosource.interceptors.addHost.hostHeader = host* > > *agent.sources.avrosource.interceptors = addTimestamp* > > *agent.sources.avrosource.interceptors.addTimestamp.type > org.apache.flume.interceptor.TimestampInterceptor$Builder* > > * * > > *# Cassandra flow* > > *agent.channels.channel1.type = FILE* > > *agent.channels.channel1.checkpointDir = file-channel1/check* > > *agent.channels.channel1.dataDirs = file-channel1/data* > > * * > > *agent.sinks.cassandraSink.channel = channel1* > > *agent.sinks.cassandraSink.type > com.btoddb.flume.sinks.cassandra.CassandraSink* > > *agent.sinks.cassandraSink.hosts = localhost* > > *agent.sinks.cassandraSink.port = 9160* > > *agent.sinks.cassandraSink.keyspace-name = logs* > > *agent.sinks.cassandraSink.records-colfam = records* > > * > * > > * > * > > * > * > > *Am running this using the command :-* > > * > * > > flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f > /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra > -Dflume.root.logger=DEBUG,console > > > Got the error while running :- > > * * > > 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor) > [ERROR - > com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)] > exception while processing in Cassandra Sink > > java.lang.IllegalArgumentException: Missing flume header attribute, 'key' > - cannot process this event > > at > com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125) > > at > com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166) > > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > > atjava.lang.Thread.run(Thread.java:722) > > 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor) > [ERROR - > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable > to deliver event. Exception follows. > > org.apache.flume.EventDeliveryException: Failed to persist message Nitin Pawar
-
Re: flume-ng cassandra sink problem..
Priyanka jain 2012-12-26, 18:05
Hi Nitin, Thanks for your suggestion.
I have done all the thing as in README. but am not getting from where I can set that key. can you please give me idea about from where I can configure it or from where its get generated.
On Wed, Dec 26, 2012 at 11:22 PM, Nitin Pawar <[EMAIL PROTECTED]>wrote:
> from the README > you need to have following things in conf > > The Sink expects several flume event headers to be present: > > - key - used (combined with src) to create the Cassandra row key. It > should be generated by the application doing the logging > - timestamp - timestamp of when the log occurred, not necessarily when > the flume event is created > - src - A logical source of the flume event. Could be host, but > probably you will have many hosts for a source. A more likely candidate for > source is the name of the application > - host - the name of the host where the message was generated > - > > > > On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain < > [EMAIL PROTECTED]> wrote: > >> HI, >> >> I am working on the POC of Cassandra flume integration. For that am >> using Cassandra sink plugin from *Github (flume-ng Cassandra sink >> plugin).* >> *And * >> *Flume-NG version-1.2.0* >> *Apache Cassandra Version :1.1.5* >> *I *have build the jar using maven and am using sink configuration as >> below in flume.conf.cassandra in conf directory... >> >> *agent.sources = avrosource* >> >> *agent.channels = channel1* >> >> *agent.sinks = cassandraSink* >> >> * * >> >> *#source defination* >> >> *agent.sources.avrosource.channels = channel1* >> >> *agent.sources.avrosource.type = exec* >> >> *agent.sources.avrosource.command = tail -f >> /home/user/priyanka/flume-ng/flnginput.txt* >> >> * * >> >> *#agent.sources.avrosource.type = avro* >> >> *#agent.sources.avrosource.channels = channel1* >> >> *#agent.sources.avrosource.bind =127.0.0.1* >> >> *#agent.sources.avrosource.port =41414* >> >> * * >> >> *#Flume header event* >> >> *agent.sources.avrosource.interceptors = addHost* >> >> *agent.sources.avrosource.interceptors.addHost.type >> org.apache.flume.interceptor.HostInterceptor$Builder* >> >> *agent.sources.avrosource.interceptors.addHost.preserveExisting = false* >> >> *agent.sources.avrosource.interceptors.addHost.useIP = false* >> >> *agent.sources.avrosource.interceptors.addHost.hostHeader = host* >> >> *agent.sources.avrosource.interceptors = addTimestamp* >> >> *agent.sources.avrosource.interceptors.addTimestamp.type >> org.apache.flume.interceptor.TimestampInterceptor$Builder* >> >> * * >> >> *# Cassandra flow* >> >> *agent.channels.channel1.type = FILE* >> >> *agent.channels.channel1.checkpointDir = file-channel1/check* >> >> *agent.channels.channel1.dataDirs = file-channel1/data* >> >> * * >> >> *agent.sinks.cassandraSink.channel = channel1* >> >> *agent.sinks.cassandraSink.type >> com.btoddb.flume.sinks.cassandra.CassandraSink* >> >> *agent.sinks.cassandraSink.hosts = localhost* >> >> *agent.sinks.cassandraSink.port = 9160* >> >> *agent.sinks.cassandraSink.keyspace-name = logs* >> >> *agent.sinks.cassandraSink.records-colfam = records* >> >> * >> * >> >> * >> * >> >> * >> * >> >> *Am running this using the command :-* >> >> * >> * >> >> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f >> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra >> -Dflume.root.logger=DEBUG,console >> >> >> Got the error while running :- >> >> * * >> >> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor) >> [ERROR - >> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)] >> exception while processing in Cassandra Sink >> >> java.lang.IllegalArgumentException: Missing flume header attribute, 'key' >> - cannot process this event >> >> at >> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125) >> >> at >> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166) >> >> at >> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
-
Re: flume-ng cassandra sink problem..
Nitin Pawar 2012-12-26, 18:20
can you provide sample line out of your log?
the cassandra sink you are using looks at a particular event headers in your log such as key, src and host On Wed, Dec 26, 2012 at 11:35 PM, Priyanka jain <[EMAIL PROTECTED]>wrote:
> Hi Nitin, > Thanks for your suggestion. > > I have done all the thing as in README. but am not getting from where I > can set that key. > can you please give me idea about from where I can configure it or from > where its get generated. > > > On Wed, Dec 26, 2012 at 11:22 PM, Nitin Pawar <[EMAIL PROTECTED]>wrote: > >> from the README >> you need to have following things in conf >> >> The Sink expects several flume event headers to be present: >> >> - key - used (combined with src) to create the Cassandra row key. It >> should be generated by the application doing the logging >> - timestamp - timestamp of when the log occurred, not necessarily >> when the flume event is created >> - src - A logical source of the flume event. Could be host, but >> probably you will have many hosts for a source. A more likely candidate for >> source is the name of the application >> - host - the name of the host where the message was generated >> - >> >> >> >> On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain < >> [EMAIL PROTECTED]> wrote: >> >>> HI, >>> >>> I am working on the POC of Cassandra flume integration. For that am >>> using Cassandra sink plugin from *Github (flume-ng Cassandra sink >>> plugin).* >>> *And * >>> *Flume-NG version-1.2.0* >>> *Apache Cassandra Version :1.1.5* >>> *I *have build the jar using maven and am using sink configuration as >>> below in flume.conf.cassandra in conf directory... >>> >>> *agent.sources = avrosource* >>> >>> *agent.channels = channel1* >>> >>> *agent.sinks = cassandraSink* >>> >>> * * >>> >>> *#source defination* >>> >>> *agent.sources.avrosource.channels = channel1* >>> >>> *agent.sources.avrosource.type = exec* >>> >>> *agent.sources.avrosource.command = tail -f >>> /home/user/priyanka/flume-ng/flnginput.txt* >>> >>> * * >>> >>> *#agent.sources.avrosource.type = avro* >>> >>> *#agent.sources.avrosource.channels = channel1* >>> >>> *#agent.sources.avrosource.bind =127.0.0.1* >>> >>> *#agent.sources.avrosource.port =41414* >>> >>> * * >>> >>> *#Flume header event* >>> >>> *agent.sources.avrosource.interceptors = addHost* >>> >>> *agent.sources.avrosource.interceptors.addHost.type >>> org.apache.flume.interceptor.HostInterceptor$Builder* >>> >>> *agent.sources.avrosource.interceptors.addHost.preserveExisting = false* >>> >>> *agent.sources.avrosource.interceptors.addHost.useIP = false* >>> >>> *agent.sources.avrosource.interceptors.addHost.hostHeader = host* >>> >>> *agent.sources.avrosource.interceptors = addTimestamp* >>> >>> *agent.sources.avrosource.interceptors.addTimestamp.type >>> org.apache.flume.interceptor.TimestampInterceptor$Builder* >>> >>> * * >>> >>> *# Cassandra flow* >>> >>> *agent.channels.channel1.type = FILE* >>> >>> *agent.channels.channel1.checkpointDir = file-channel1/check* >>> >>> *agent.channels.channel1.dataDirs = file-channel1/data* >>> >>> * * >>> >>> *agent.sinks.cassandraSink.channel = channel1* >>> >>> *agent.sinks.cassandraSink.type >>> com.btoddb.flume.sinks.cassandra.CassandraSink* >>> >>> *agent.sinks.cassandraSink.hosts = localhost* >>> >>> *agent.sinks.cassandraSink.port = 9160* >>> >>> *agent.sinks.cassandraSink.keyspace-name = logs* >>> >>> *agent.sinks.cassandraSink.records-colfam = records* >>> >>> * >>> * >>> >>> * >>> * >>> >>> * >>> * >>> >>> *Am running this using the command :-* >>> >>> * >>> * >>> >>> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f >>> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra >>> -Dflume.root.logger=DEBUG,console >>> >>> >>> Got the error while running :- >>> >>> * * >>> >>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor) >>> [ERROR - >>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)] >>
Nitin Pawar
-
Re: flume-ng cassandra sink problem..
Priyanka Jain 2012-12-27, 06:52
Hi ,
On Wed, Dec 26, 2012 at 11:50 PM, Nitin Pawar <[EMAIL PROTECTED]>wrote:
> can you provide sample line out of your log? > > the cassandra sink you are using looks at a particular event headers in > your log such as key, src and host > > > On Wed, Dec 26, 2012 at 11:35 PM, Priyanka jain < > [EMAIL PROTECTED]> wrote: > >> Hi Nitin, >> Thanks for your suggestion. >> >> I have done all the thing as in README. but am not getting from where I >> can set that key. >> can you please give me idea about from where I can configure it or from >> where its get generated. >> >> >> On Wed, Dec 26, 2012 at 11:22 PM, Nitin Pawar <[EMAIL PROTECTED]>wrote: >> >>> from the README >>> you need to have following things in conf >>> >>> The Sink expects several flume event headers to be present: >>> >>> - key - used (combined with src) to create the Cassandra row key. It >>> should be generated by the application doing the logging >>> - timestamp - timestamp of when the log occurred, not necessarily >>> when the flume event is created >>> - src - A logical source of the flume event. Could be host, but >>> probably you will have many hosts for a source. A more likely candidate for >>> source is the name of the application >>> - host - the name of the host where the message was generated >>> - >>> >>> >>> >>> On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain < >>> [EMAIL PROTECTED]> wrote: >>> >>>> HI, >>>> >>>> I am working on the POC of Cassandra flume integration. For that am >>>> using Cassandra sink plugin from *Github (flume-ng Cassandra sink >>>> plugin).* >>>> *And * >>>> *Flume-NG version-1.2.0* >>>> *Apache Cassandra Version :1.1.5* >>>> *I *have build the jar using maven and am using sink configuration as >>>> below in flume.conf.cassandra in conf directory... >>>> >>>> *agent.sources = avrosource* >>>> >>>> *agent.channels = channel1* >>>> >>>> *agent.sinks = cassandraSink* >>>> >>>> * * >>>> >>>> *#source defination* >>>> >>>> *agent.sources.avrosource.channels = channel1* >>>> >>>> *agent.sources.avrosource.type = exec* >>>> >>>> *agent.sources.avrosource.command = tail -f >>>> /home/user/priyanka/flume-ng/flnginput.txt* >>>> >>>> * * >>>> >>>> *#agent.sources.avrosource.type = avro* >>>> >>>> *#agent.sources.avrosource.channels = channel1* >>>> >>>> *#agent.sources.avrosource.bind =127.0.0.1* >>>> >>>> *#agent.sources.avrosource.port =41414* >>>> >>>> * * >>>> >>>> *#Flume header event* >>>> >>>> *agent.sources.avrosource.interceptors = addHost* >>>> >>>> *agent.sources.avrosource.interceptors.addHost.type >>>> org.apache.flume.interceptor.HostInterceptor$Builder* >>>> >>>> *agent.sources.avrosource.interceptors.addHost.preserveExisting = false >>>> * >>>> >>>> *agent.sources.avrosource.interceptors.addHost.useIP = false* >>>> >>>> *agent.sources.avrosource.interceptors.addHost.hostHeader = host* >>>> >>>> *agent.sources.avrosource.interceptors = addTimestamp* >>>> >>>> *agent.sources.avrosource.interceptors.addTimestamp.type >>>> org.apache.flume.interceptor.TimestampInterceptor$Builder* >>>> >>>> * * >>>> >>>> *# Cassandra flow* >>>> >>>> *agent.channels.channel1.type = FILE* >>>> >>>> *agent.channels.channel1.checkpointDir = file-channel1/check* >>>> >>>> *agent.channels.channel1.dataDirs = file-channel1/data* >>>> >>>> * * >>>> >>>> *agent.sinks.cassandraSink.channel = channel1* >>>> >>>> *agent.sinks.cassandraSink.type >>>> com.btoddb.flume.sinks.cassandra.CassandraSink* >>>> >>>> *agent.sinks.cassandraSink.hosts = localhost* >>>> >>>> *agent.sinks.cassandraSink.port = 9160* >>>> >>>> *agent.sinks.cassandraSink.keyspace-name = logs* >>>> >>>> *agent.sinks.cassandraSink.records-colfam = records* >>>> >>>> * >>>> * >>>> >>>> * >>>> * >>>> >>>> * >>>> * >>>> >>>> *Am running this using the command :-* >>>> >>>> * >>>> * >>>> >>>> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f >>>> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra >>>> -Dflume.root.logger=DEBUG,console
-
Re: flume-ng cassandra sink problem..
Priyanka Jain 2012-12-27, 06:54
On Thu, Dec 27, 2012 at 12:22 PM, Priyanka Jain <[EMAIL PROTECTED]>wrote:
> Hi , > > Sending the some log for the error.In attached file. > > On Wed, Dec 26, 2012 at 11:50 PM, Nitin Pawar <[EMAIL PROTECTED]>wrote: > >> can you provide sample line out of your log? >> >> the cassandra sink you are using looks at a particular event headers in >> your log such as key, src and host >> >> >> On Wed, Dec 26, 2012 at 11:35 PM, Priyanka jain < >> [EMAIL PROTECTED]> wrote: >> >>> Hi Nitin, >>> Thanks for your suggestion. >>> >>> I have done all the thing as in README. but am not getting from where I >>> can set that key. >>> can you please give me idea about from where I can configure it or from >>> where its get generated. >>> >>> >>> On Wed, Dec 26, 2012 at 11:22 PM, Nitin Pawar <[EMAIL PROTECTED]>wrote: >>> >>>> from the README >>>> you need to have following things in conf >>>> >>>> The Sink expects several flume event headers to be present: >>>> >>>> - key - used (combined with src) to create the Cassandra row key. >>>> It should be generated by the application doing the logging >>>> - timestamp - timestamp of when the log occurred, not necessarily >>>> when the flume event is created >>>> - src - A logical source of the flume event. Could be host, but >>>> probably you will have many hosts for a source. A more likely candidate for >>>> source is the name of the application >>>> - host - the name of the host where the message was generated >>>> - >>>> >>>> >>>> >>>> On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain < >>>> [EMAIL PROTECTED]> wrote: >>>> >>>>> HI, >>>>> >>>>> I am working on the POC of Cassandra flume integration. For that am >>>>> using Cassandra sink plugin from *Github (flume-ng Cassandra sink >>>>> plugin).* >>>>> *And * >>>>> *Flume-NG version-1.2.0* >>>>> *Apache Cassandra Version :1.1.5* >>>>> *I *have build the jar using maven and am using sink configuration as >>>>> below in flume.conf.cassandra in conf directory... >>>>> >>>>> *agent.sources = avrosource* >>>>> >>>>> *agent.channels = channel1* >>>>> >>>>> *agent.sinks = cassandraSink* >>>>> >>>>> * * >>>>> >>>>> *#source defination* >>>>> >>>>> *agent.sources.avrosource.channels = channel1* >>>>> >>>>> *agent.sources.avrosource.type = exec* >>>>> >>>>> *agent.sources.avrosource.command = tail -f >>>>> /home/user/priyanka/flume-ng/flnginput.txt* >>>>> >>>>> * * >>>>> >>>>> *#agent.sources.avrosource.type = avro* >>>>> >>>>> *#agent.sources.avrosource.channels = channel1* >>>>> >>>>> *#agent.sources.avrosource.bind =127.0.0.1* >>>>> >>>>> *#agent.sources.avrosource.port =41414* >>>>> >>>>> * * >>>>> >>>>> *#Flume header event* >>>>> >>>>> *agent.sources.avrosource.interceptors = addHost* >>>>> >>>>> *agent.sources.avrosource.interceptors.addHost.type >>>>> org.apache.flume.interceptor.HostInterceptor$Builder* >>>>> >>>>> *agent.sources.avrosource.interceptors.addHost.preserveExisting >>>>> false* >>>>> >>>>> *agent.sources.avrosource.interceptors.addHost.useIP = false* >>>>> >>>>> *agent.sources.avrosource.interceptors.addHost.hostHeader = host* >>>>> >>>>> *agent.sources.avrosource.interceptors = addTimestamp* >>>>> >>>>> *agent.sources.avrosource.interceptors.addTimestamp.type >>>>> org.apache.flume.interceptor.TimestampInterceptor$Builder* >>>>> >>>>> * * >>>>> >>>>> *# Cassandra flow* >>>>> >>>>> *agent.channels.channel1.type = FILE* >>>>> >>>>> *agent.channels.channel1.checkpointDir = file-channel1/check* >>>>> >>>>> *agent.channels.channel1.dataDirs = file-channel1/data* >>>>> >>>>> * * >>>>> >>>>> *agent.sinks.cassandraSink.channel = channel1* >>>>> >>>>> *agent.sinks.cassandraSink.type >>>>> com.btoddb.flume.sinks.cassandra.CassandraSink* >>>>> >>>>> *agent.sinks.cassandraSink.hosts = localhost* >>>>> >>>>> *agent.sinks.cassandraSink.port = 9160* >>>>> >>>>> *agent.sinks.cassandraSink.keyspace-name = logs* >>>>> >>>>> *agent.sinks.cassandraSink.records-colfam = records* >>>>> >>>>> *
|
|