Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Flume >> mail # user >> flume-ng cassandra sink problem..


Copy link to this message
-
flume-ng cassandra sink problem..
HI,

I am working on the POC of Cassandra flume integration. For that am
using Cassandra sink plugin from *Github (flume-ng Cassandra sink plugin).*
*And *
*Flume-NG version-1.2.0*
*Apache Cassandra Version :1.1.5*
*I *have build the jar using maven and am using sink configuration as below
in flume.conf.cassandra in conf directory...

*agent.sources = avrosource*

*agent.channels = channel1*

*agent.sinks = cassandraSink*

* *

*#source defination*

*agent.sources.avrosource.channels = channel1*

*agent.sources.avrosource.type = exec*

*agent.sources.avrosource.command = tail -f
/home/user/priyanka/flume-ng/flnginput.txt*

* *

*#agent.sources.avrosource.type = avro*

*#agent.sources.avrosource.channels = channel1*

*#agent.sources.avrosource.bind =127.0.0.1*

*#agent.sources.avrosource.port =41414*

* *

*#Flume header event*

*agent.sources.avrosource.interceptors = addHost*

*agent.sources.avrosource.interceptors.addHost.type org.apache.flume.interceptor.HostInterceptor$Builder*

*agent.sources.avrosource.interceptors.addHost.preserveExisting = false*

*agent.sources.avrosource.interceptors.addHost.useIP = false*

*agent.sources.avrosource.interceptors.addHost.hostHeader = host*

*agent.sources.avrosource.interceptors = addTimestamp*

*agent.sources.avrosource.interceptors.addTimestamp.type org.apache.flume.interceptor.TimestampInterceptor$Builder*

* *

*# Cassandra flow*

*agent.channels.channel1.type = FILE*

*agent.channels.channel1.checkpointDir = file-channel1/check*

*agent.channels.channel1.dataDirs = file-channel1/data*

* *

*agent.sinks.cassandraSink.channel = channel1*

*agent.sinks.cassandraSink.type com.btoddb.flume.sinks.cassandra.CassandraSink*

*agent.sinks.cassandraSink.hosts = localhost*

*agent.sinks.cassandraSink.port = 9160*

*agent.sinks.cassandraSink.keyspace-name  = logs*

*agent.sinks.cassandraSink.records-colfam  = records*

*
*

*
*

*
*

*Am running this using the command :-*

*
*

flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f
/usr/lib/flume-ng-1.2/conf/flume.conf.cassandra
-Dflume.root.logger=DEBUG,console
Got the error while running :-

* *

2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[ERROR -
com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)]
exception while processing in Cassandra Sink

java.lang.IllegalArgumentException: Missing flume header attribute, 'key' -
cannot process this event

at
com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)

at
com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)

at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

atjava.lang.Thread.run(Thread.java:722)

2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[ERROR -
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
to deliver event. Exception follows.

org.apache.flume.EventDeliveryException: Failed to persist message

at
com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194)

at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

atjava.lang.Thread.run(Thread.java:722)

Caused by: java.lang.IllegalArgumentException: Missing flume header
attribute, 'key' - cannot process this event

at
com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)

at
com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)

        ... 3 more

I got one solution as Key is Src+Key but am not getting how to configure it.
So can any one please help me out to solve this problem.
So
+
Nitin Pawar 2012-12-26, 17:52
+
Priyanka jain 2012-12-26, 18:05
+
Nitin Pawar 2012-12-26, 18:20
+
Priyanka Jain 2012-12-27, 06:52
+
Priyanka Jain 2012-12-27, 06:54
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB