Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> FLATTEN(bag_of_tuples) error in 0.8.1 ?


Copy link to this message
-
FLATTEN(bag_of_tuples) error in 0.8.1 ?
I created a Udf that returns a Bag of Tuples.  the syntax is all fine, but
when I run it in pig,
Pig gives error:
2/07/17 16:51:58 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with
processName=JobTracker, sessionId= - already initialized
12/07/17 16:51:58 WARN mapred.LocalJobRunner: job_local_0001
java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.pig.data.Tuple
at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:392)
 at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:342)
at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:290)
 at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:237)
at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:232)
 at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:53)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
 at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
 at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)
12/07/17 16:51:58 INFO mapReduceLayer.MapReduceLauncher: HadoopJobId:
job_local_0001

it looks that the returned value is wrong somehow. but I checked the
outputSchema() method, and it is exactly the same as
online docs. where am I wrong?
---- this is pig 0.8.1 .       I posted a question about 1 month ago,
stating that 0.8.1 FLATTEN(bag_of_tuples) behavior is different from
0.10.0, in that
it keeps the enclosing tuple, while 0.10.0 strips it and places the fields
at the root level.

Thanks!
yang

///// DemoUdf.java

import java.io.IOException;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultDataBag;
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class DemoUdf  extends EvalFunc<DataBag> {

 @Override
public DataBag exec(Tuple args) throws IOException {

 Tuple t1 = new DefaultTuple();
t1.append("xx");
t1.append("yy");
 Tuple t2 = new DefaultTuple();
t2.append("xxx");
 t2.append("yyy");
 DataBag b = new DefaultDataBag();
 b.add(t1);
b.add(t2);
return b;
 }

// schema is bagContent:bag{bagContentTuple:tuple(x, y)}
@Override
 public Schema outputSchema(Schema input) {
try {

Schema insideTuple = new Schema();// this is a tuple
 insideTuple.add(new Schema.FieldSchema("x", DataType.CHARARRAY));
insideTuple.add(new Schema.FieldSchema("y", DataType.CHARARRAY));
 Schema out = new Schema();
out.add(new Schema.FieldSchema("bagContent", bagOfTuples("bagContent",
insideTuple), DataType.BAG));
 return out;
 } catch (FrontendException e) {
e.printStackTrace();
return null;
 }
}

private Schema bagOfTuples(String bagName, Schema tupleSchema) throws
FrontendException {
 Schema bagSchema = new Schema();
// the name does not really matter here, you will see it only on describe
output
 bagSchema.add(new Schema.FieldSchema(bagName + "Tuple", tupleSchema,
DataType.TUPLE ));

return bagSchema;
 }

}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
///   src/test/java/DemoTest.java

import org.apache.pig.pigunit.PigTest;
import org.junit.Test;

public class DemoTest {
 @Test
public void blah() {}
 @Test
public void testSimple() throws Exception {
     PigTest test = new PigTest("src/test/resources/test_demo.pig");
 // sample input data schema
    // x :  bag{(vertex:int, cliques:bag{tuple(id:int, privateId:int)})  }

    String [] inputData = { "1" };
    String [] expectedOutput = { "({xxx,yyy})"};
    // don't really verify anything, too long
    test.assertOutput("inputdata", inputData, "tuples", expectedOutput);

}

 }
/////////////////////////////////////////////////////////////////
/// src/test/resources/test_demo.pig

DEFINE demo DemoUdf();

inputdata = load 'src/test/resources/test_demo.pig' as (x:chararray);
tuples = FOREACH inputdata GENERATE FLATTEN(demo(1)) as kkk;

tuples = FOREACH tuples GENERATE $0;
STORE tuples INTO 'fake_output';