Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce, mail # user - How do I set the intermediate output path when I use 2 mapreduce jobs?


Copy link to this message
-
Re: Re: Re: Re: How do I set the intermediate output path when I use 2 mapreduce jobs?
Swathi V 2011-09-24, 15:03
Hi Jun Tun,

Yeah ! surely...
Well the code i gave is the new API.

2011/9/24 谭军 <[EMAIL PROTECTED]>

> Hi Swathi.V.,
> Thank you very much.
> It's very kind of you to do that.
> I think the code you gave is implemented in old APIs.
> I made it several days ago. What I can't is by new APIs.
> I just get started to mapreduce programming and get some problems with my
> code.
> When you get time we can talk online.
> Thanks!
>
> --
>
> Regards!
>
> Jun Tan
>
> At 2011-09-24 01:37:54,"Swathi V" <[EMAIL PROTECTED]> wrote:
>
> Hi JunTun,
>
> 1. Distributed Cache in new API usage:
>
>  // Setting up the cache for the application
>
>      1. Copy the requisite files to the FileSystem:
>
>      $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat
>      $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip
>      $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
>      $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
>      $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
>      $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
>
>      2. Setup the application's JobConf:
>
>      JobConf job = new JobConf();
>      DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),
>                                    job);
>      DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
>      DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
>      DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
>      DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
>      DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
>
>      3. Use the cached files in the Mapper <http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/Mapper.html>
>      or Reducer <http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/Reducer.html>:
>
>      public static class MapClass extends MapReduceBase
>      implements Mapper<K, V, K, V> {
>
>        private Path[] localArchives;
>        private Path[] localFiles;
>
>        public void configure(JobConf job) {
>          // Get the cached archives/files
>          localArchives = DistributedCache.getLocalCacheArchives(job);
>          localFiles = DistributedCache.getLocalCacheFiles(job);
>        }
>
>        public void map(K key, V value,
>                        OutputCollector<K, V> output, Reporter reporter)
>        throws IOException {
>          // Use data from the cached archives/files here
>          // ...
>          // ...
>          output.collect(k, v);
>        }
>      }
>
>
> 2. without distributed cache in simple terms if you are interested i can
> help you with the code.
>
>
>
> 2011/9/23 谭军 <[EMAIL PROTECTED]>
>
>> Hi Swathi.V.,
>> I think my code below would work:
>>
>>         Configuration conf1 = new Configuration();
>>         Job job1 = new Job(conf1, "Retrieval1");
>>         job1.setJarByClass(Retrieval.class);
>>         job1.addCacheFile(new URI(args[0]));   // problem here
>>         conf1.set("keyNodeFile", args[0]);         //try to set key node
>> file path and get file path in mapper1
>>         job1.setOutputKeyClass(Text.class);
>>         job1.setOutputValueClass(Text.class);
>>         job1.setMapperClass(RetrievalMapper.class);
>>         job1.setReducerClass(RetrievalReducer.class);
>>         FileInputFormat.addInputPath(job1, new Path(args[1]));
>>         String out = args[2] + System.nanoTime();
>>
>>         FileOutputFormat.setOutputPath(job1, new Path(out));
>>         job1.waitForCompletion(true);
>>
>>         Configuration conf2 = new Configuration();
>>         Job job2 = new Job(conf2, "Retrieval2");
>>         job2.setJarByClass(Retrieval.class);
>>         conf2.set("newKeyNodeFile", out);   // try to set new key node
>> file path and get it in mapper2
>>         DistributedCache.addCacheFile(new URI(out));  // problem here
>>         job2.setOutputKeyClass(Text.class);
>

Regards,
Swathi.V.