|
felix gao
2011-11-02, 01:07
Ashutosh Chauhan
2011-11-02, 17:47
felix gao
2011-11-02, 21:10
Ashutosh Chauhan
2011-11-02, 22:00
felix gao
2011-11-02, 22:19
Dmitriy Ryaboy
2011-11-03, 16:45
Raghu Angadi
2011-11-04, 22:31
felix gao
2011-11-08, 18:41
Raghu Angadi
2011-11-08, 19:57
felix gao
2011-11-08, 20:32
|
-
Question on custom store functionfelix gao 2011-11-02, 01:07
I have wrote a custom store function that primarily based on the
multi-storage store function. They way I use it is store load_log INTO '/Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}' using MyMultiStorage('2,1', '1,2'); where {0} and {1} will be substituted with the tuple index at 0 and index at 1. Everything is fine and all the data is written to the correct place. The only problem is that in the setStoreLocation function we have to call FileOutputFormat.setOutputPath(job, new Path(location)); i have 'Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}' as my output location so there is actually a folder created in my fs with ns_{0} and site_{1}. Is there a way to tell hadoop not to create those output directory? Thanks, Felix +
felix gao 2011-11-02, 01:07
-
Re: Question on custom store functionAshutosh Chauhan 2011-11-02, 17:47
Hey Felix,
>> The only problem is that in the setStoreLocation function we have to call >> FileOutputFormat.setOutputPath(job, new Path(location)); Cant you massage location to appropriate string you want to? Ashutosh On Tue, Nov 1, 2011 at 18:07, felix gao <[EMAIL PROTECTED]> wrote: > I have wrote a custom store function that primarily based on the > multi-storage store function. They way I use it is > > > store load_log INTO > '/Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}' using > MyMultiStorage('2,1', '1,2'); > where {0} and {1} will be substituted with the tuple index at 0 and index > at 1. Everything is fine and all the data is written to the correct place. > The only problem is that in the setStoreLocation function we have to call > FileOutputFormat.setOutputPath(job, new Path(location)); i have > 'Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}' as my output > location so there is actually a folder created in my fs with ns_{0} > and site_{1}. Is there a way to tell hadoop not to create those output > directory? > > Thanks, > > Felix > +
Ashutosh Chauhan 2011-11-02, 17:47
-
Re: Question on custom store functionfelix gao 2011-11-02, 21:10
Ashutosh,
I problem is I don't wan to use that location at all since I am constructing the output location based on tuple input. The location is just a dummy holder for me to substitute the right parameters Felix On Wed, Nov 2, 2011 at 10:47 AM, Ashutosh Chauhan <[EMAIL PROTECTED]>wrote: > Hey Felix, > > >> The only problem is that in the setStoreLocation function we have to > call > >> FileOutputFormat.setOutputPath(job, new Path(location)); > > Cant you massage location to appropriate string you want to? > > Ashutosh > > On Tue, Nov 1, 2011 at 18:07, felix gao <[EMAIL PROTECTED]> wrote: > > > I have wrote a custom store function that primarily based on the > > multi-storage store function. They way I use it is > > > > > > store load_log INTO > > '/Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}' using > > MyMultiStorage('2,1', '1,2'); > > where {0} and {1} will be substituted with the tuple index at 0 and index > > at 1. Everything is fine and all the data is written to the correct > place. > > The only problem is that in the setStoreLocation function we have to > call > > FileOutputFormat.setOutputPath(job, new Path(location)); i have > > 'Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}' as my > output > > location so there is actually a folder created in my fs with ns_{0} > > and site_{1}. Is there a way to tell hadoop not to create those output > > directory? > > > > Thanks, > > > > Felix > > > +
felix gao 2011-11-02, 21:10
-
Re: Question on custom store functionAshutosh Chauhan 2011-11-02, 22:00
Then, don't call FileOutputFormat.setOutputPath(job, new Path(location));
Looks like I am missing something here. Ashutosh On Wed, Nov 2, 2011 at 14:10, felix gao <[EMAIL PROTECTED]> wrote: > Ashutosh, > > I problem is I don't wan to use that location at all since I am > constructing the output location based on tuple input. The location is just > a dummy holder for me to substitute the right parameters > > Felix > > On Wed, Nov 2, 2011 at 10:47 AM, Ashutosh Chauhan <[EMAIL PROTECTED] > >wrote: > > > Hey Felix, > > > > >> The only problem is that in the setStoreLocation function we have to > > call > > >> FileOutputFormat.setOutputPath(job, new Path(location)); > > > > Cant you massage location to appropriate string you want to? > > > > Ashutosh > > > > On Tue, Nov 1, 2011 at 18:07, felix gao <[EMAIL PROTECTED]> wrote: > > > > > I have wrote a custom store function that primarily based on the > > > multi-storage store function. They way I use it is > > > > > > > > > store load_log INTO > > > '/Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}' using > > > MyMultiStorage('2,1', '1,2'); > > > where {0} and {1} will be substituted with the tuple index at 0 and > index > > > at 1. Everything is fine and all the data is written to the correct > > place. > > > The only problem is that in the setStoreLocation function we have to > > call > > > FileOutputFormat.setOutputPath(job, new Path(location)); i have > > > 'Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}' as my > > output > > > location so there is actually a folder created in my fs with ns_{0} > > > and site_{1}. Is there a way to tell hadoop not to create those output > > > directory? > > > > > > Thanks, > > > > > > Felix > > > > > > +
Ashutosh Chauhan 2011-11-02, 22:00
-
Re: Question on custom store functionfelix gao 2011-11-02, 22:19
If you don't call that funciton. Hadoop is going to throw exception for not
having output set for the job. something like Caused by: org.apache.hadoop.mapred.InvalidJobConfException: Output directory not set. at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:120) at org.apache.pig.newplan.logical.rules.InputOutputFileValidator$InputOutputFileVisitor.visit(InputOutputFileValidator.java:87) So i have to set it and then somehow delete it after pig completes. On Wed, Nov 2, 2011 at 3:00 PM, Ashutosh Chauhan <[EMAIL PROTECTED]>wrote: > Then, don't call FileOutputFormat.setOutputPath(job, new Path(location)); > Looks like I am missing something here. > > Ashutosh > On Wed, Nov 2, 2011 at 14:10, felix gao <[EMAIL PROTECTED]> wrote: > > > Ashutosh, > > > > I problem is I don't wan to use that location at all since I am > > constructing the output location based on tuple input. The location is > just > > a dummy holder for me to substitute the right parameters > > > > Felix > > > > On Wed, Nov 2, 2011 at 10:47 AM, Ashutosh Chauhan <[EMAIL PROTECTED] > > >wrote: > > > > > Hey Felix, > > > > > > >> The only problem is that in the setStoreLocation function we have to > > > call > > > >> FileOutputFormat.setOutputPath(job, new Path(location)); > > > > > > Cant you massage location to appropriate string you want to? > > > > > > Ashutosh > > > > > > On Tue, Nov 1, 2011 at 18:07, felix gao <[EMAIL PROTECTED]> wrote: > > > > > > > I have wrote a custom store function that primarily based on the > > > > multi-storage store function. They way I use it is > > > > > > > > > > > > store load_log INTO > > > > '/Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}' using > > > > MyMultiStorage('2,1', '1,2'); > > > > where {0} and {1} will be substituted with the tuple index at 0 and > > index > > > > at 1. Everything is fine and all the data is written to the correct > > > place. > > > > The only problem is that in the setStoreLocation function we have to > > > call > > > > FileOutputFormat.setOutputPath(job, new Path(location)); i have > > > > 'Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}' as my > > > output > > > > location so there is actually a folder created in my fs with ns_{0} > > > > and site_{1}. Is there a way to tell hadoop not to create those > output > > > > directory? > > > > > > > > Thanks, > > > > > > > > Felix > > > > > > > > > > +
felix gao 2011-11-02, 22:19
-
Re: Question on custom store functionDmitriy Ryaboy 2011-11-03, 16:45
Don't use FileOutputFormat? Or rather, use something that extends it and
overrides the validation. On Wed, Nov 2, 2011 at 3:19 PM, felix gao <[EMAIL PROTECTED]> wrote: > If you don't call that funciton. Hadoop is going to throw exception for not > having output set for the job. > something like > Caused by: org.apache.hadoop.mapred.InvalidJobConfException: Output > directory not set. > at > > org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:120) > at > > org.apache.pig.newplan.logical.rules.InputOutputFileValidator$InputOutputFileVisitor.visit(InputOutputFileValidator.java:87) > > So i have to set it and then somehow delete it after pig completes. > > > > > On Wed, Nov 2, 2011 at 3:00 PM, Ashutosh Chauhan <[EMAIL PROTECTED] > >wrote: > > > Then, don't call FileOutputFormat.setOutputPath(job, new Path(location)); > > Looks like I am missing something here. > > > > Ashutosh > > On Wed, Nov 2, 2011 at 14:10, felix gao <[EMAIL PROTECTED]> wrote: > > > > > Ashutosh, > > > > > > I problem is I don't wan to use that location at all since I am > > > constructing the output location based on tuple input. The location is > > just > > > a dummy holder for me to substitute the right parameters > > > > > > Felix > > > > > > On Wed, Nov 2, 2011 at 10:47 AM, Ashutosh Chauhan < > [EMAIL PROTECTED] > > > >wrote: > > > > > > > Hey Felix, > > > > > > > > >> The only problem is that in the setStoreLocation function we have > to > > > > call > > > > >> FileOutputFormat.setOutputPath(job, new Path(location)); > > > > > > > > Cant you massage location to appropriate string you want to? > > > > > > > > Ashutosh > > > > > > > > On Tue, Nov 1, 2011 at 18:07, felix gao <[EMAIL PROTECTED]> wrote: > > > > > > > > > I have wrote a custom store function that primarily based on the > > > > > multi-storage store function. They way I use it is > > > > > > > > > > > > > > > store load_log INTO > > > > > '/Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}' > using > > > > > MyMultiStorage('2,1', '1,2'); > > > > > where {0} and {1} will be substituted with the tuple index at 0 and > > > index > > > > > at 1. Everything is fine and all the data is written to the > correct > > > > place. > > > > > The only problem is that in the setStoreLocation function we have > to > > > > call > > > > > FileOutputFormat.setOutputPath(job, new Path(location)); i have > > > > > 'Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}' as > my > > > > output > > > > > location so there is actually a folder created in my fs with ns_{0} > > > > > and site_{1}. Is there a way to tell hadoop not to create those > > output > > > > > directory? > > > > > > > > > > Thanks, > > > > > > > > > > Felix > > > > > > > > > > > > > > > +
Dmitriy Ryaboy 2011-11-03, 16:45
-
Re: Question on custom store functionRaghu Angadi 2011-11-04, 22:31
You need to set output path to '/Users/felix/Documents/pig/multi_store_output'
in your setStoreLocation(). Alternately for clarity, you could modify your store udf to be more like: store load_log INTO '/Users/felix/Documents/pig/multi_store_output' using MyMultiStorage('ns_{0}/site_{1}', '2,1', '1,2'); The reason FileOutputFormat needs a real path is that, at run time hadoop actually uses a temporary path then move the output to correct path if the job succeeds. Raghu. On Thu, Nov 3, 2011 at 9:45 AM, Dmitriy Ryaboy <[EMAIL PROTECTED]> wrote: > Don't use FileOutputFormat? Or rather, use something that extends it and > overrides the validation. > > On Wed, Nov 2, 2011 at 3:19 PM, felix gao <[EMAIL PROTECTED]> wrote: > > > If you don't call that funciton. Hadoop is going to throw exception for > not > > having output set for the job. > > something like > > Caused by: org.apache.hadoop.mapred.InvalidJobConfException: Output > > directory not set. > > at > > > > > org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:120) > > at > > > > > org.apache.pig.newplan.logical.rules.InputOutputFileValidator$InputOutputFileVisitor.visit(InputOutputFileValidator.java:87) > > > > So i have to set it and then somehow delete it after pig completes. > > > > > > > > > > On Wed, Nov 2, 2011 at 3:00 PM, Ashutosh Chauhan <[EMAIL PROTECTED] > > >wrote: > > > > > Then, don't call FileOutputFormat.setOutputPath(job, new > Path(location)); > > > Looks like I am missing something here. > > > > > > Ashutosh > > > On Wed, Nov 2, 2011 at 14:10, felix gao <[EMAIL PROTECTED]> wrote: > > > > > > > Ashutosh, > > > > > > > > I problem is I don't wan to use that location at all since I am > > > > constructing the output location based on tuple input. The location > is > > > just > > > > a dummy holder for me to substitute the right parameters > > > > > > > > Felix > > > > > > > > On Wed, Nov 2, 2011 at 10:47 AM, Ashutosh Chauhan < > > [EMAIL PROTECTED] > > > > >wrote: > > > > > > > > > Hey Felix, > > > > > > > > > > >> The only problem is that in the setStoreLocation function we > have > > to > > > > > call > > > > > >> FileOutputFormat.setOutputPath(job, new Path(location)); > > > > > > > > > > Cant you massage location to appropriate string you want to? > > > > > > > > > > Ashutosh > > > > > > > > > > On Tue, Nov 1, 2011 at 18:07, felix gao <[EMAIL PROTECTED]> wrote: > > > > > > > > > > > I have wrote a custom store function that primarily based on the > > > > > > multi-storage store function. They way I use it is > > > > > > > > > > > > > > > > > > store load_log INTO > > > > > > '/Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}' > > using > > > > > > MyMultiStorage('2,1', '1,2'); > > > > > > where {0} and {1} will be substituted with the tuple index at 0 > and > > > > index > > > > > > at 1. Everything is fine and all the data is written to the > > correct > > > > > place. > > > > > > The only problem is that in the setStoreLocation function we > have > > to > > > > > call > > > > > > FileOutputFormat.setOutputPath(job, new Path(location)); i have > > > > > > 'Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}' as > > my > > > > > output > > > > > > location so there is actually a folder created in my fs with > ns_{0} > > > > > > and site_{1}. Is there a way to tell hadoop not to create those > > > output > > > > > > directory? > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Felix > > > > > > > > > > > > > > > > > > > > > +
Raghu Angadi 2011-11-04, 22:31
-
Re: Question on custom store functionfelix gao 2011-11-08, 18:41
Raghu,
I change the code to what you sugguested, but I got an exception when i try to store. java.io.IOException: File already exists:file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-a/part-r-00000 at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:228) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:335) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:368) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:484) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:465) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:372) at com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.createOutputStream(BkMultiStorage.java:325) at com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.getStore(BkMultiStorage.java:304) at com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.getStore(BkMultiStorage.java:298) at com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.write(BkMultiStorage.java:285) at com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.write(BkMultiStorage.java:261) at com.bluekai.analytics.pig.storage.BkMultiStorage.putNext(BkMultiStorage.java:184) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat$PigRecordWriter.write(PigOutputFormat.java:138) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat$PigRecordWriter.write(PigOutputFormat.java:97) at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:508) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.processOnePackageOutput(PigMapReduce.java:395) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:381) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:250) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:216) where prefix-a is dynamically generated based on my tuple. final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-0/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-1/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-2/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-3/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-4/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-5/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-6/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-7/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-8/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-9/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-A/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-B/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-C/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-D/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-E/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-F/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-G/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-H/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-I/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-J/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-K/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-L/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-M/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-N/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-O/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-P/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-Q/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-R/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-S/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-T/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-U/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-V/part-r-00000 final output stores at file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/pre +
felix gao 2011-11-08, 18:41
-
Re: Question on custom store functionRaghu Angadi 2011-11-08, 19:57
The path is certainly case sensitive. Not sure why this file already
exists. you could post relevant implementation here. On Tue, Nov 8, 2011 at 10:41 AM, felix gao <[EMAIL PROTECTED]> wrote: > Raghu, > > I change the code to what you sugguested, but I got an exception when i try > to store. > java.io.IOException: File already > > exists:file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-a/part-r-00000 > at > org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:228) > at > > org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:335) > at > org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:368) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:484) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:465) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:372) > at > > com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.createOutputStream(BkMultiStorage.java:325) > at > > com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.getStore(BkMultiStorage.java:304) > at > > com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.getStore(BkMultiStorage.java:298) > at > > com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.write(BkMultiStorage.java:285) > at > > com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.write(BkMultiStorage.java:261) > at > > com.bluekai.analytics.pig.storage.BkMultiStorage.putNext(BkMultiStorage.java:184) > at > > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat$PigRecordWriter.write(PigOutputFormat.java:138) > at > > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat$PigRecordWriter.write(PigOutputFormat.java:97) > at > > org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:508) > at > > org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) > at > > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.processOnePackageOutput(PigMapReduce.java:395) > at > > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:381) > at > > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:250) > at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) > at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566) > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408) > at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:216) > > where prefix-a is dynamically generated based on my tuple. > > final output stores at > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-0/part-r-00000 > final output stores at > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-1/part-r-00000 > final output stores at > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-2/part-r-00000 > final output stores at > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-3/part-r-00000 > final output stores at > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-4/part-r-00000 > final output stores at > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-5/part-r-00000 > final output stores at > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-6/part-r-00000 > final output stores at > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-7/part-r-00000 > final output stores at > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-8/part-r-00000 > final output stores at > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-9/part-r-00000 > final output stores at > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-A/part-r-00000 +
Raghu Angadi 2011-11-08, 19:57
-
Re: Question on custom store functionfelix gao 2011-11-08, 20:32
Sure thing. below is the entire source code for storefunc.
/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and limitations under the License. */ package com.bluekai.analytics.pig.storage; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.text.MessageFormat; import java.text.NumberFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapred.InvalidJobConfException; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.pig.LoadFunc; import org.apache.pig.ResourceSchema; import org.apache.pig.StoreFunc; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.util.StorageUtil; public class BkMultiStorage extends StoreFunc { private String outputPath; // User specified output Path private MessageFormat messageFormat = new MessageFormat("/dev/null"); private List<Integer> splitFieldIndex = new ArrayList<Integer>(); // Index of the key fields used for path construction private Set<Integer> skippedFieldIndex = new HashSet<Integer>(); //Index of the key fields to skip when storing to hdfs private final String fieldDel; // delimiter of the output record. private Compression comp; // Compression type of output data. private RecordWriter<String, Tuple> writer; private static final TupleFactory tf = TupleFactory.getInstance(); private static int constructed_times = 0; private static int entred_setLocation = 0; // Compression types supported by this store enum Compression { none, bz2, bz, gz; }; public BkMultiStorage(String output, String splitFieldsIndex){ this(output, splitFieldsIndex, ""); } public BkMultiStorage(String output, String splitFieldsIndex, String skippedFieldsIndex) { this(output, splitFieldsIndex, skippedFieldsIndex, "none"); } public BkMultiStorage(String output, String splitFieldsIndex, String skippedFieldsIndex, String compression) { this(output, splitFieldsIndex, skippedFieldsIndex, compression, "\\t"); } //TODO: make this method more rebust with customized split sep private List<Integer> fieldsParser(String fieldString){ if ("".endsWith(fieldString.trim())){ return new ArrayList<Integer>(); } List<Integer> output = new ArrayList<Integer>(); for (String part : fieldString.trim().split(",")){ if (part != null || !part.endsWith("")) output.add(Integer.valueOf(part)); } return output; } /** * Constructor * * @param parentPathStr * Parent output dir path * @param splitFieldIndex * key field index * @param compression * 'bz2', 'bz', 'gz' or 'none' * @param fieldDel * Output record field delimiter. */ public BkMultiStorage(String output, String splitFieldsIndex, String skippedFieldsIndex, String compression, String fieldDel) { this.messageFormat = new MessageFormat(output); this.splitFieldIndex = fieldsParser(splitFieldsIndex); this.skippedFieldIndex = new HashSet<Integer>(fieldsParser(skippedFieldsIndex)); this.fieldDel = fieldDel; try { this.comp = (compression == null) ? Compression.none : Compression.valueOf(compression.toLowerCase()); } catch (IllegalArgumentException e) { System.err.println("Exception when converting compression string: "+ compression +" to enum. No compression will be used"); this.comp = Compression.none; } constructed_times++; } //-------------------------------------------------------------------------- // Implementation of StoreFunc @Override public void putNext(Tuple tuple) throws IOException { //construct the output key //check to see if I should skip this field when write to HDFS int tupleSize = tuple.size(); Object[] pathStore = new Object[splitFieldIndex.size()]; int startIndex = 0; for (int index : splitFieldIndex){ if (tupleSize <= index) { throw new IOException("split field index:" + index + " >= tuple size:" + tupleSize); } pathStore[startIndex++] = String.valueOf(tuple.get(index)); } Tuple writeTuple; if (this.skippedFieldIndex.size() == 0){ writeTuple = tuple; }else{ writeTuple = tf.newTuple(); for (int i = 0; i < tupleSize; i++) { if (!this.skippedFieldIndex.contains(i)) writeTuple.append(tuple.get(i)); } } String outputPath = messageFormat.format(pathStore); //construct a new tuple or remove fields from tuple that needs +
felix gao 2011-11-08, 20:32
|