|
|
-
Hi,all. How can I involve two avro files with different schema into one M/R job?
幻 2011-03-18, 03:13
Hi,all. Currently,I have two avro files with different schema. I found that I have to set the schema before running a M/R job if the files are in avro format.But the schema of the files are probably not the same.How can I do that without setting the schema before running a job? Thanks. Best Regards, Xu Yingzhong
-
Re: Hi,all. How can I involve two avro files with different schema into one M/R job?
Doug Cutting 2011-03-18, 16:51
On 03/17/2011 08:13 PM, 幻 wrote: > Currently,I have two avro files with different schema. I found that > I have to set the schema before running a M/R job if the files are in > avro format.But the schema of the files are probably not the same.How > can I do that without setting the schema before running a job? Thanks. The schema you set for the job is the reader's schema. The schema in the input files is the writer's schema and not match this exactly. It will be projected to the reader's schema, as described in the specification, particularly in the "Schema Resolution" section. http://avro.apache.org/docs/current/spec.html#Schema+ResolutionThe aliases section is also relevant: http://avro.apache.org/docs/current/spec.html#AliasesThis can be used to extract fields from different schemas into a common data structure. For example, if your input files use the following two schemas: {"type":"record", "name":"a.A", "fields":[{"name":"foo", "type":"int"}]} {"type":"record", "name":"b.B", "fields":[{"name":"bar", "type":"int"}]} then the following record can read both: {"type":"record", "name":"my.MapInput", "aliases":["a.A","b.B"], "fields":[{"name":"x", "type":"int", "aliases":["foo","bar"]}] } The reader's schema can thus include a common subset of fields in inputs. It can map fields of compatible types that are named differently to a common field. It can include fields that are not in all inputs, so long as they have a default value in the reader's schema. It could include all data from all inputs, e.g., in the above case: {"type":"record", "name":"my.MapInput", "aliases":["a.A","b.B"], "fields":[ {"name":"foo", "type":"int", "default": -1}, {"name":"bar", "type":"int", "default": -1}, ] } So there's a fair amount of flexibility available. Doug
-
Re: Hi,all. How can I involve two avro files with different schema into one M/R job?
Harsh J 2011-03-18, 16:54
Doug, Would it help if the provided JSON schemae were added to the JobConf with the given path(s) as a prefix to the key used to retrieve them? This would help use with MultipleInputs and such (but it may get complicated to do if globs were involved?). On Fri, Mar 18, 2011 at 10:21 PM, Doug Cutting <[EMAIL PROTECTED]> wrote: > On 03/17/2011 08:13 PM, 幻 wrote: >> Currently,I have two avro files with different schema. I found that >> I have to set the schema before running a M/R job if the files are in >> avro format.But the schema of the files are probably not the same.How >> can I do that without setting the schema before running a job? Thanks. > > The schema you set for the job is the reader's schema. The schema in > the input files is the writer's schema and not match this exactly. It > will be projected to the reader's schema, as described in the > specification, particularly in the "Schema Resolution" section. > > http://avro.apache.org/docs/current/spec.html#Schema+Resolution> > The aliases section is also relevant: > > http://avro.apache.org/docs/current/spec.html#Aliases> > This can be used to extract fields from different schemas into a common > data structure. For example, if your input files use the following two > schemas: > > {"type":"record", "name":"a.A", "fields":[{"name":"foo", "type":"int"}]} > {"type":"record", "name":"b.B", "fields":[{"name":"bar", "type":"int"}]} > > then the following record can read both: > > {"type":"record", "name":"my.MapInput", > "aliases":["a.A","b.B"], > "fields":[{"name":"x", "type":"int", "aliases":["foo","bar"]}] > } > > The reader's schema can thus include a common subset of fields in > inputs. It can map fields of compatible types that are named > differently to a common field. It can include fields that are not in > all inputs, so long as they have a default value in the reader's schema. > It could include all data from all inputs, e.g., in the above case: > > {"type":"record", "name":"my.MapInput", > "aliases":["a.A","b.B"], > "fields":[ > {"name":"foo", "type":"int", "default": -1}, > {"name":"bar", "type":"int", "default": -1}, > ] > } > > So there's a fair amount of flexibility available. > > Doug > -- Harsh J http://harshj.com
-
Re: Hi,all. How can I involve two avro files with different schema into one M/R job?
Doug Cutting 2011-03-18, 18:08
On 03/18/2011 09:54 AM, Harsh J wrote: > Would it help if the provided JSON schemae were added to the JobConf > with the given path(s) as a prefix to the key used to retrieve them? > This would help use with MultipleInputs and such (but it may get > complicated to do if globs were involved?).
Not sure what you mean by "provided". The user must specify a reader schema for the job, and map tasks will be passed instances of this schema. That reader schema is available as AvroJob.getInputSchema(job).
If you wanted to know the writer's schema of each file read, then I suppose we could have AvroRecordReader set that to a job property, or simply expose this by adding a method like:
Schema AvroRecordReader.getSchema() { return reader.getSchema(); }
Is that what you're after? Why would you need this?
Doug
-
Re: Hi,all. How can I involve two avro files with different schema into one M/R job?
Harsh J 2011-03-18, 18:31
On Fri, Mar 18, 2011 at 11:38 PM, Doug Cutting <[EMAIL PROTECTED]> wrote: > Is that what you're after? Why would you need this? Probably a small case, in which I would require reading from multiple sources in my job (perhaps even process them differently until the Map phase), with special reader-schemas for each of my sources. This could be custom-built easily, but I just wondered if general use-cases of avro datafiles could benefit from such a thing. Right now AvroJob.setInputSchema(...) sets given schema as "avro.input.schema" in the Job, and my suggestion was to make it something like /path/1+avro.input.schema, /path/2+avro.input.schema so that each instantiated record reader for mappers (via MultipleInputs) can pick up its own special reader schema (since they get a /path/2 via FileSplit). -- Harsh J http://harshj.com
-
Re: Hi,all. How can I involve two avro files with different schema into one M/R job?
Doug Cutting 2011-03-18, 19:59
On 03/18/2011 11:31 AM, Harsh J wrote: > Probably a small case, in which I would require reading from multiple > sources in my job (perhaps even process them differently until the Map > phase), with special reader-schemas for each of my sources.
How would your mapper detect which schema was in use? Would it use something like instanceof? If that's the case, then you could simply use a union as the job's schema.
Or would you want a different mapper for each input type? That seems like a higher-level tool, like Hadoop's MultipleInputs, which shouldn't be too hard to build, but I don't think should be built into the base MapReduce API, but rather a layer above it, no?
Doug
-
Re: Hi,all. How can I involve two avro files with different schema into one M/R job?
幻 2011-03-21, 02:15
Thanks.I meet this problem because I want to use avro as storage format for HIVE.Now I find a solution,but I'm not sure if it's good enough: I change the AvroInputFormat to: protected FileStatus[] listStatus(JobConf job) throws IOException { List<FileStatus> result = new ArrayList<FileStatus>(); // job.set for (FileStatus file : super.listStatus(job)) if (file.getPath().getName().endsWith(AvroOutputFormat.EXT)){
// UtilHelper.localTest("Files:"+file.getPath().toUri()+" | "+file.getPath().toString(), "/opt/hivelogs/root/mine.log"); this.setJobSchemas(job, file); result.add(file); }
return result.toArray(new FileStatus[0]); } And in AvroRecordReader: public AvroRecordReader(JobConf job, FileSplit split) throws IOException { //LOG.info("Here is the file:"+split.getPath().getName()); // UtilHelper.localTest("Here is the file:"+split.getPath().toString(), "/opt/hivelogs/root/mine.log"); // UtilHelper.localTest("--Schema is:"+job.get(split.getPath().toString()+"-schema"), "/opt/hivelogs/root/mine.log"); /* this(DataFileReader.openReader (new FsInput(split.getPath(), job), job.getBoolean(AvroJob.INPUT_IS_REFLECT, false) ? new ReflectDatumReader<T>(AvroJob.getInputSchema(job)) : new SpecificDatumReader<T>(AvroJob.getInputSchema(job))), split); */ this(DataFileReader.openReader (new FsInput(split.getPath(), job), job.getBoolean(AvroJob.INPUT_IS_REFLECT, false) ? new ReflectDatumReader<T>(Schema.parse(job.get(split.getPath().toString()+"-schema"))) : new SpecificDatumReader<T>(Schema.parse(job.get(split.getPath().toString()+"-schema")))), split);
}
2011/3/19 Doug Cutting <[EMAIL PROTECTED]>
> On 03/18/2011 11:31 AM, Harsh J wrote: > > Probably a small case, in which I would require reading from multiple > > sources in my job (perhaps even process them differently until the Map > > phase), with special reader-schemas for each of my sources. > > How would your mapper detect which schema was in use? Would it use > something like instanceof? If that's the case, then you could simply > use a union as the job's schema. > > Or would you want a different mapper for each input type? That seems > like a higher-level tool, like Hadoop's MultipleInputs, which shouldn't > be too hard to build, but I don't think should be built into the base > MapReduce API, but rather a layer above it, no? > > Doug >
-
Re: Hi,all. How can I involve two avro files with different schema into one M/R job?
幻 2011-03-21, 02:22
Sorry,Some mistakes.This is the AvroInputFormat part: protected FileStatus[] listStatus(JobConf job) throws IOException { List<FileStatus> result = new ArrayList<FileStatus>(); // job.set for (FileStatus file : super.listStatus(job)) if (file.getPath().getName().endsWith(AvroOutputFormat.EXT)){
// UtilHelper.localTest("Files:"+file.getPath().toUri()+" | "+file.getPath().toString(), "/opt/hivelogs/root/mine.log"); this.setJobSchemas(job, file); result.add(file); }
return result.toArray(new FileStatus[0]); } private void setJobSchemas(JobConf job,FileStatus file) throws IOException { // TODO Auto-generated method stub // String dst = "hdfs://localhost:9000/user/hive/warehouse/test/test.avro";
Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(file.getPath().toUri(), conf); // FileStatus FSDataInputStream hdfsInStream = fs.open(new Path(file.getPath().toString()));
DataFileStream stream = new DataFileStream(hdfsInStream,new GenericDatumReader<Object>()); // System.out.println(stream.getSchema()); UtilHelper.localTest("--Schema:"+stream.getSchema().toString(),"/opt/hivelogs/root/mine.log"); job.set(file.getPath()+"-schema", stream.getSchema().toString()); stream.close(); fs.close(); // DataFileReader rr = new DataFileReader(hdfsInStream); }
2011/3/21 幻 <[EMAIL PROTECTED]>
> Thanks.I meet this problem because I want to use avro as storage format for > HIVE.Now I find a solution,but I'm not sure if it's good enough: > I change the AvroInputFormat to: > protected FileStatus[] listStatus(JobConf job) throws IOException { > List<FileStatus> result = new ArrayList<FileStatus>(); > // job.set > for (FileStatus file : super.listStatus(job)) > if (file.getPath().getName().endsWith(AvroOutputFormat.EXT)){ > > // UtilHelper.localTest("Files:"+file.getPath().toUri()+" | > "+file.getPath().toString(), "/opt/hivelogs/root/mine.log"); > this.setJobSchemas(job, file); > result.add(file); > } > > return result.toArray(new FileStatus[0]); > } > And in AvroRecordReader: > public AvroRecordReader(JobConf job, FileSplit split) > throws IOException { > //LOG.info("Here is the file:"+split.getPath().getName()); > // UtilHelper.localTest("Here is the file:"+split.getPath().toString(), > "/opt/hivelogs/root/mine.log"); > // UtilHelper.localTest("--Schema > is:"+job.get(split.getPath().toString()+"-schema"), > "/opt/hivelogs/root/mine.log"); > /* > this(DataFileReader.openReader > (new FsInput(split.getPath(), job), > job.getBoolean(AvroJob.INPUT_IS_REFLECT, false) > ? new ReflectDatumReader<T>(AvroJob.getInputSchema(job)) > : new SpecificDatumReader<T>(AvroJob.getInputSchema(job))), > split); > */ > this(DataFileReader.openReader > (new FsInput(split.getPath(), job), > job.getBoolean(AvroJob.INPUT_IS_REFLECT, false) > ? new > ReflectDatumReader<T>(Schema.parse(job.get(split.getPath().toString()+"-schema"))) > : new > SpecificDatumReader<T>(Schema.parse(job.get(split.getPath().toString()+"-schema")))), > split); > > } > > 2011/3/19 Doug Cutting <[EMAIL PROTECTED]> > >> On 03/18/2011 11:31 AM, Harsh J wrote: >> > Probably a small case, in which I would require reading from multiple >> > sources in my job (perhaps even process them differently until the Map >> > phase), with special reader-schemas for each of my sources. >> >> How would your mapper detect which schema was in use? Would it use >> something like instanceof? If that's the case, then you could simply >> use a union as the job's schema. >> >> Or would you want a different mapper for each input type? That seems >> like a higher-level tool, like Hadoop's MultipleInputs, which shouldn't >> be too hard to build, but I don't think should be built into the base >> MapReduce API, but rather a layer above it, no? >> >> Doug >> > >
|
|