Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> FLATTEN(bag_of_tuples) error in 0.8.1 ?


Copy link to this message
-
FLATTEN(bag_of_tuples) error in 0.8.1 ?
I created a Udf that returns a Bag of Tuples.  the syntax is all fine, but
when I run it in pig,
Pig gives error:
2/07/17 16:51:58 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with
processName=JobTracker, sessionId= - already initialized
12/07/17 16:51:58 WARN mapred.LocalJobRunner: job_local_0001
java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.pig.data.Tuple
at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:392)
 at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:342)
at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:290)
 at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:237)
at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:232)
 at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:53)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
 at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
 at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)
12/07/17 16:51:58 INFO mapReduceLayer.MapReduceLauncher: HadoopJobId:
job_local_0001

it looks that the returned value is wrong somehow. but I checked the
outputSchema() method, and it is exactly the same as
online docs. where am I wrong?
---- this is pig 0.8.1 .       I posted a question about 1 month ago,
stating that 0.8.1 FLATTEN(bag_of_tuples) behavior is different from
0.10.0, in that
it keeps the enclosing tuple, while 0.10.0 strips it and places the fields
at the root level.

Thanks!
yang

///// DemoUdf.java

import java.io.IOException;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultDataBag;
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class DemoUdf  extends EvalFunc<DataBag> {

 @Override
public DataBag exec(Tuple args) throws IOException {

 Tuple t1 = new DefaultTuple();
t1.append("xx");
t1.append("yy");
 Tuple t2 = new DefaultTuple();
t2.append("xxx");
 t2.append("yyy");
 DataBag b = new DefaultDataBag();
 b.add(t1);
b.add(t2);
return b;
 }

// schema is bagContent:bag{bagContentTuple:tuple(x, y)}
@Override
 public Schema outputSchema(Schema input) {
try {

Schema insideTuple = new Schema();// this is a tuple
 insideTuple.add(new Schema.FieldSchema("x", DataType.CHARARRAY));
insideTuple.add(new Schema.FieldSchema("y", DataType.CHARARRAY));
 Schema out = new Schema();
out.add(new Schema.FieldSchema("bagContent", bagOfTuples("bagContent",
insideTuple), DataType.BAG));
 return out;
 } catch (FrontendException e) {
e.printStackTrace();
return null;
 }
}

private Schema bagOfTuples(String bagName, Schema tupleSchema) throws
FrontendException {
 Schema bagSchema = new Schema();
// the name does not really matter here, you will see it only on describe
output
 bagSchema.add(new Schema.FieldSchema(bagName + "Tuple", tupleSchema,
DataType.TUPLE ));

return bagSchema;
 }

}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
///   src/test/java/DemoTest.java

import org.apache.pig.pigunit.PigTest;
import org.junit.Test;

public class DemoTest {
 @Test
public void blah() {}
 @Test
public void testSimple() throws Exception {
     PigTest test = new PigTest("src/test/resources/test_demo.pig");
 // sample input data schema
    // x :  bag{(vertex:int, cliques:bag{tuple(id:int, privateId:int)})  }

    String [] inputData = { "1" };
    String [] expectedOutput = { "({xxx,yyy})"};
    // don't really verify anything, too long
    test.assertOutput("inputdata", inputData, "tuples", expectedOutput);

}

 }
/////////////////////////////////////////////////////////////////
/// src/test/resources/test_demo.pig

DEFINE demo DemoUdf();

inputdata = load 'src/test/resources/test_demo.pig' as (x:chararray);
tuples = FOREACH inputdata GENERATE FLATTEN(demo(1)) as kkk;

tuples = FOREACH tuples GENERATE $0;
STORE tuples INTO 'fake_output';
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB