Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce >> mail # user >> How do I set the intermediate output path when I use 2 mapreduce jobs?

Copy link to this message
Re: Re: Re: How do I set the intermediate output path when I use 2 mapreduce jobs?
Hi JunTun,

1. Distributed Cache in new API usage:

 // Setting up the cache for the application

     1. Copy the requisite files to the FileSystem:

     $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat
     $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip
     $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
     $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
     $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
     $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz

     2. Setup the application's JobConf:

     JobConf job = new JobConf();
     DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),
     DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
     DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
     DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
     DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
     DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);

     3. Use the cached files in the Mapper
     or Reducer

     public static class MapClass extends MapReduceBase
     implements Mapper<K, V, K, V> {

       private Path[] localArchives;
       private Path[] localFiles;

       public void configure(JobConf job) {
         // Get the cached archives/files
         localArchives = DistributedCache.getLocalCacheArchives(job);
         localFiles = DistributedCache.getLocalCacheFiles(job);

       public void map(K key, V value,
                       OutputCollector<K, V> output, Reporter reporter)
       throws IOException {
         // Use data from the cached archives/files here
         // ...
         // ...
         output.collect(k, v);
2. without distributed cache in simple terms if you are interested i can
help you with the code.

2011/9/23 谭军 <[EMAIL PROTECTED]>

> Hi Swathi.V.,
> I think my code below would work:
>         Configuration conf1 = new Configuration();
>         Job job1 = new Job(conf1, "Retrieval1");
>         job1.setJarByClass(Retrieval.class);
>         job1.addCacheFile(new URI(args[0]));   // problem here
>         conf1.set("keyNodeFile", args[0]);         //try to set key node
> file path and get file path in mapper1
>         job1.setOutputKeyClass(Text.class);
>         job1.setOutputValueClass(Text.class);
>         job1.setMapperClass(RetrievalMapper.class);
>         job1.setReducerClass(RetrievalReducer.class);
>         FileInputFormat.addInputPath(job1, new Path(args[1]));
>         String out = args[2] + System.nanoTime();
>         FileOutputFormat.setOutputPath(job1, new Path(out));
>         job1.waitForCompletion(true);
>         Configuration conf2 = new Configuration();
>         Job job2 = new Job(conf2, "Retrieval2");
>         job2.setJarByClass(Retrieval.class);
>         conf2.set("newKeyNodeFile", out);   // try to set new key node file
> path and get it in mapper2
>         DistributedCache.addCacheFile(new URI(out));  // problem here
>         job2.setOutputKeyClass(Text.class);
>         job2.setOutputValueClass(Text.class);
>         job2.setMapperClass(RetrievalMapper2.class);
>         job2.setReducerClass(RetrievalReducer2.class);
>         FileInputFormat.addInputPath(job2, new Path(args[1]));
>         FileOutputFormat.setOutputPath(job2, new Path(args[2]));
>         System.exit(job2.waitForCompletion(true) ? 0 : 1);
> But nullpointer exception was reported when I tried to get file by using
> distributed cache file.
> How to use distributed cache file in new APIs ?
> I also tried to deliver file path by setting global parameters, however,
> failed either.
> How can I read "args[0]" file in mapper1 and intermediate file in mapper2