Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume, mail # user - flume-ng cassandra sink problem..


Copy link to this message
-
Re: flume-ng cassandra sink problem..
Priyanka jain 2012-12-26, 18:05
Hi Nitin,
Thanks for your suggestion.

I have done all the thing as in README. but am not getting from where I can
set that key.
can you please give me idea about from where I can configure it or from
where its get generated.

On Wed, Dec 26, 2012 at 11:22 PM, Nitin Pawar <[EMAIL PROTECTED]>wrote:

> from the README
> you need to have following things in conf
>
> The Sink expects several flume event headers to be present:
>
>    - key - used (combined with src) to create the Cassandra row key. It
>    should be generated by the application doing the logging
>    - timestamp - timestamp of when the log occurred, not necessarily when
>    the flume event is created
>    - src - A logical source of the flume event. Could be host, but
>    probably you will have many hosts for a source. A more likely candidate for
>    source is the name of the application
>    - host - the name of the host where the message was generated
>    -
>
>
>
> On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain <
> [EMAIL PROTECTED]> wrote:
>
>> HI,
>>
>> I am working on the POC of Cassandra flume integration. For that am
>> using Cassandra sink plugin from *Github (flume-ng Cassandra sink
>> plugin).*
>> *And *
>> *Flume-NG version-1.2.0*
>> *Apache Cassandra Version :1.1.5*
>> *I *have build the jar using maven and am using sink configuration as
>> below in flume.conf.cassandra in conf directory...
>>
>> *agent.sources = avrosource*
>>
>> *agent.channels = channel1*
>>
>> *agent.sinks = cassandraSink*
>>
>> * *
>>
>> *#source defination*
>>
>> *agent.sources.avrosource.channels = channel1*
>>
>> *agent.sources.avrosource.type = exec*
>>
>> *agent.sources.avrosource.command = tail -f
>> /home/user/priyanka/flume-ng/flnginput.txt*
>>
>> * *
>>
>> *#agent.sources.avrosource.type = avro*
>>
>> *#agent.sources.avrosource.channels = channel1*
>>
>> *#agent.sources.avrosource.bind =127.0.0.1*
>>
>> *#agent.sources.avrosource.port =41414*
>>
>> * *
>>
>> *#Flume header event*
>>
>> *agent.sources.avrosource.interceptors = addHost*
>>
>> *agent.sources.avrosource.interceptors.addHost.type >> org.apache.flume.interceptor.HostInterceptor$Builder*
>>
>> *agent.sources.avrosource.interceptors.addHost.preserveExisting = false*
>>
>> *agent.sources.avrosource.interceptors.addHost.useIP = false*
>>
>> *agent.sources.avrosource.interceptors.addHost.hostHeader = host*
>>
>> *agent.sources.avrosource.interceptors = addTimestamp*
>>
>> *agent.sources.avrosource.interceptors.addTimestamp.type >> org.apache.flume.interceptor.TimestampInterceptor$Builder*
>>
>> * *
>>
>> *# Cassandra flow*
>>
>> *agent.channels.channel1.type = FILE*
>>
>> *agent.channels.channel1.checkpointDir = file-channel1/check*
>>
>> *agent.channels.channel1.dataDirs = file-channel1/data*
>>
>> * *
>>
>> *agent.sinks.cassandraSink.channel = channel1*
>>
>> *agent.sinks.cassandraSink.type >> com.btoddb.flume.sinks.cassandra.CassandraSink*
>>
>> *agent.sinks.cassandraSink.hosts = localhost*
>>
>> *agent.sinks.cassandraSink.port = 9160*
>>
>> *agent.sinks.cassandraSink.keyspace-name  = logs*
>>
>> *agent.sinks.cassandraSink.records-colfam  = records*
>>
>> *
>> *
>>
>> *
>> *
>>
>> *
>> *
>>
>> *Am running this using the command :-*
>>
>> *
>> *
>>
>> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f
>> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra
>> -Dflume.root.logger=DEBUG,console
>>
>>
>> Got the error while running :-
>>
>> * *
>>
>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>> [ERROR -
>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)]
>> exception while processing in Cassandra Sink
>>
>> java.lang.IllegalArgumentException: Missing flume header attribute, 'key'
>> - cannot process this event
>>
>> at
>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>
>> at
>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>
>> at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)