Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Hadoop >> mail # user >> DistributedCache does not seem to copy the HDFS files to local


Copy link to this message
-
DistributedCache does not seem to copy the HDFS files to local
Hi All

  I want to use the DistributedCache   to  perform replicated join on the
map side.
  My java code refer to [1][2].
  When I run the job,the file that I want to cache in the local dir of my
DN is not to copied.So the FileNotFoundException error came out[3]

  And I checkout the source code of DistributedCache.java as following[4].
 It shows that it just sets up the  conf ,however  no copy phase  to cache
the file to local dir.

  My question are:
 1. how could I to access the DistributedCache files?
 2.Does the DistributedCache    do the copy work to copy the hdfs file to
the local dir?

 [1] My driver class:
job = new Job(conf,"join work");
DistributedCache.addCacheFile(new
URI("hdfs://Hadoop01:8040/user/hadoop/sqoop/CMTSIFTABLE/part-m-00000"),
job.getConfiguration());

[2] My the setup function of  Mapper class:

@Override
public void setup(Context context){
try {
 Path[] cacheFiles DistributedCache.getLocalCacheFiles(context.getConfiguration());
// URI[] cacheFiles DistributedCache.getCacheFiles(context.getConfiguration());
 //System.out.print("-----cacheFiles-----"+cacheFiles.length);
 if (cacheFiles != null && cacheFiles.length > 0){
String line ;
String[] tokens ;
 BufferedReader joinReader = new BufferedReader(new
FileReader(cacheFiles[0].toString()));
while ((line = joinReader.readLine()) != null ) {
tokens = StringUtils.split(line,'|');
if (tokens != null && tokens.length >= 3){
joinData.put(tokens[0], tokens[1]);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

[3] the FileNotFoundException

java.io.FileNotFoundException:
file:/tmp/hadoop-hadoop/mapred/local/4990846200525419138/part-m-00000 (No
such file or directory) at java.io.FileInputStream.open(Native Method) at
java.io.FileInputStream.<init>(FileInputStream.java:120) at
java.io.FileInputStream.<init>(FileInputStream.java:79) at
java.io.FileReader.<init>(FileReader.java:41) at
com.jhetl.hadoop.map.CMTS.IFSNMapper.setup(IFSNMapper.java:46) at
org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142) at
org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:725) at
org.apache.hadoop.mapred.MapTask.run(MapTask.java:332) at
org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:232)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at
java.util.concurrent.FutureTask.run(FutureTask.java:138) at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

[4]source of DistributedCache
  /**
   * Add a file to be localized to the conf.  Intended
   * to be used by user code.
   * @param uri The uri of the cache to be localized
   * @param conf Configuration to add the cache to
   * @deprecated Use {@link Job#addCacheFile(URI)} instead
   */
  @Deprecated
  public static void addCacheFile(URI uri, Configuration conf) {
    String files = conf.get(MRJobConfig.CACHE_FILES);
    conf.set(MRJobConfig.CACHE_FILES, files == null ? uri.toString() :
files + ","
             + uri.toString());
  }
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB