Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> Pig + Cassandra Example !!


Copy link to this message
-
Re: Pig + Cassandra Example !!
Try fully qualifying CassandraStorage() to org.apache.cassandra.hadoop.
pig.CassandraStorage().

-Dan

On Mon, Mar 18, 2013 at 11:56 AM, Mohammed Abdelkhalek <
[EMAIL PROTECTED]> wrote:

> Thank you for replying,
> In fact, i'm trying to run this script:
> grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING CassandraStorage()
> AS (key, columns: bag {T: tuple(name, value)});
> grunt> cols = FOREACH rows GENERATE flatten(columns);
> grunt> colnames = FOREACH cols GENERATE $0;
> grunt> namegroups = GROUP colnames BY (chararray) $0;
> grunt> namecounts = FOREACH namegroups GENERATE COUNT($1), group;
> grunt> orderednames = ORDER namecounts BY $0;
> grunt> topnames = LIMIT orderednames 50;
> grunt> dump topnames;
>
> but i'm having this error:
> ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve
> CassandraStorage using imports: [, org.apache.pig.builtin.,
> org.apache.pig.impl.builtin.]
>
>
>
>
> 2013/3/18 Dan DeCapria, CivicScience <[EMAIL PROTECTED]>
>
> > Storing to Cassandra requires a key->column->value data structure from
> pig.
> >  Here is one possible approach, requiring a udf to handle the pig
> > formatting interchange to cassandra:
> >
> > -- sample pig script
> > A = LOAD 'foo' USING PigStorage() AS (key:chararray, name:chararray,
> > value:chararray);
> > B = FOREACH A GENERATE
> >
> >
> FLATTEN(com.to.udf.ToCassandraUDF(TOTUPLE('PotentialKeyManipulation/',$0,'/toSomethingElse'),
> > TOTUPLE($1), $2));
> > STORE B INTO 'cassandra://cassandraNamespace/myColumnFamilyName' USING
> > org.apache.cassandra.hadoop.pig.CassandraStorage();
> >
> > -- sample toCassandraUDF
> > package com.to.udf.ToCassandraUDF;
> >
> > public class ToCassandraUDF extends EvalFunc<Tuple> {
> >
> > public Tuple exec(Tuple input) throws IOException {
> > Tuple row = TupleFactory.getInstance().newTuple(2);
> > StringBuffer buf = null;
> >  Tuple keyParts = (Tuple) input.get(0);
> > buf = new StringBuffer();
> > for (Object o : keyParts.getAll()) {
> > if (o != null) {
> > buf.append(o.toString());
> > } else {
> > buf.append("null");
> > }
> > }
> > String key = buf.toString();
> >  Tuple columnParts = (Tuple) input.get(1);
> > buf = new StringBuffer();
> > for (Object o : columnParts.getAll()) {
> > if (o != null) {
> > buf.append(o.toString());
> > } else {
> > buf.append("null");
> > }
> > }
> > String columnName = buf.toString();
> >
> > byte[] columnValueBytes = null;
> > if (input.size() > 2) {
> > Object value = input.get(2);
> > columnValueBytes = value.toString().getBytes();
> > } else {
> > columnValueBytes = new byte[0];
> > }
> >
> > Tuple column = TupleFactory.getInstance().newTuple(2);
> > column.set(0, new DataByteArray(columnName.getBytes()));
> > column.set(1, new DataByteArray(columnValueBytes));
> >
> > DataBag bagOfColumns = BagFactory.getInstance().newDefaultBag();
> > bagOfColumns.add(column);
> >
> > row.set(0, key);
> > row.set(1, bagOfColumns);
> >
> > return row;
> > }
> >
> > public Schema outputSchema(Schema input) {
> > try {
> > Schema.FieldSchema keyField = new Schema.FieldSchema("key",
> > DataType.CHARARRAY);
> > Schema.FieldSchema nameField = new Schema.FieldSchema("name",
> > DataType.CHARARRAY);
> > Schema.FieldSchema valueField = new Schema.FieldSchema("value",
> > DataType.BYTEARRAY);
> >
> > Schema bagSchema = new Schema();
> > bagSchema.add(nameField);
> > bagSchema.add(valueField);
> >
> > Schema.FieldSchema columnsField = new Schema.FieldSchema("columns",
> > bagSchema, DataType.BAG);
> >
> > Schema innerSchema = new Schema();
> > innerSchema.add(keyField);
> > innerSchema.add(columnsField);
> >
> > Schema.FieldSchema cassandraTuple = new Schema.FieldSchema(
> > "cassandra_tuple", innerSchema, DataType.TUPLE);
> >
> > Schema schema = new Schema(cassandraTuple);
> > schema.setTwoLevelAccessRequired(true);
> > return schema;
> > } catch (Exception e) {
> > return null;
> > }
> > }
> > }
> >
> > Hope this helps.
> >
> > -Dan
> >
> > On Mon, Mar 18, 2013 at 11:15 AM, Mohammed Abdelkhalek <