|
|
-
IOException when using MultipleSequenceFileOutputFormat
Jason Yang 2012-09-17, 13:50
Hi, all
I have written a simple MR program which partition a file into multiple files bases on the clustering result of the points in this file, here is my code: --- private int run() throws IOException { String scheme = getConf().get(CommonUtility.ATTR_SCHEME); String ecgDir = getConf().get(CommonUtility.ATTR_ECG_DATA_DIR); String outputDir = getConf().get(CommonUtility.ATTR_OUTPUT_DIR);
// create JobConf JobConf jobConf = new JobConf(getConf(), this.getClass());
// set path for input and output Path inPath = new Path(scheme + ecgDir); Path outPath = new Path(scheme + outputDir + CommonUtility.OUTPUT_LOCAL_CLUSTERING); FileInputFormat.addInputPath(jobConf, inPath); FileOutputFormat.setOutputPath(jobConf, outPath);
// clear output if it already existed CommonUtility.deleteHDFSFile(outPath.toString());
// set format for input and output jobConf.setInputFormat(WholeFileInputFormat.class); jobConf.setOutputFormat(LocalClusterMSFOutputFormat.class);
// set class of output key and value jobConf.setOutputKeyClass(Text.class); jobConf.setOutputValueClass(RRIntervalWritable.class);
// set mapper and reducer jobConf.setMapperClass(LocalClusteringMapper.class); jobConf.setReducerClass(IdentityReducer.class); // run the job JobClient.runJob(jobConf); return 0; }
...
public class LocalClusteringMapper extends MapReduceBase implements Mapper<NullWritable, BytesWritable, Text, RRIntervalWritable> { @Override public void map(NullWritable key, BytesWritable value, OutputCollector<Text, RRIntervalWritable> output, Reporter reporter) throws IOException { //read and cluster ...
// output Iterator<RRIntervalWritable> it = rrArray.iterator(); while (it.hasNext()) { RRIntervalWritable rr = it.next();
Text outputKey = new Text(rr.clusterResult );
output.collect(outputKey, rr); }
}
...
public class LocalClusterMSFOutputFormat extends MultipleSequenceFileOutputFormat<Text, RRIntervalWritable> {
protected String generateFileNameForKeyValue(Text key, RRIntervalWritable value, String name) { return value.clusterResult.toString(); } } ---
But this program always get a IO Exception when running in a pseudo-distributed cluster, and the log has been attached at the end of this post.
There's something wired: 1. If I use the SequenceFileOutputFormat instead of MultipleSequenceFileOutputFormat, this program would works fine( at least there is no error in log). 2. The one which always cause the error is the EcgData002509_LCF_3
> 12/09/17 21:10:35 INFO mapred.MapTask: Starting flush of map output > 12/09/17 21:10:35 INFO mapred.MapTask: Finished spill 0 > 12/09/17 21:10:35 INFO mapred.TaskRunner: > Task:attempt_local_0001_m_000019_0 is done. And is in the process of > commiting > 12/09/17 21:10:35 INFO mapred.LocalJobRunner: > 12/09/17 21:10:35 INFO mapred.TaskRunner: Task > 'attempt_local_0001_m_000019_0' done. > 12/09/17 21:10:35 INFO mapred.LocalJobRunner: > 12/09/17 21:10:35 INFO mapred.Merger: Merging 20 sorted segments > 12/09/17 21:10:35 INFO mapred.Merger: Merging 2 intermediate segments out > of a total of 20 > 12/09/17 21:10:35 INFO mapred.Merger: Merging 10 intermediate segments out > of a total of 19 > 12/09/17 21:10:35 INFO mapred.Merger: Down to the last merge-pass, with 10 > segments left of total size: 18913891 bytes > 12/09/17 21:10:35 INFO mapred.LocalJobRunner: > 12/09/17 21:10:39 WARN hdfs.DFSClient: DataStreamer Exception: > org.apache.hadoop.ipc.RemoteException: java.io.IOException: File > /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3 > could only be replicated to 0 nodes, instead of 1 > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271) > at > org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422) > at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) YANG, Lin
+
Jason Yang 2012-09-17, 13:50
-
Re: IOException when using MultipleSequenceFileOutputFormat
Harsh J 2012-09-18, 02:38
Hi Jason,
How many unique keys are you going to be generating from this program, roughly?
By default, the max-load of a DN is about 4k threads and if you're trying to push beyond that value then the NN will no longer select the DN as it would consider it already overloaded. In a fully distributed mode, you may not see this issue as there's several DNs and TTs to distribute the write load across.
Try with a smaller input sample if there's a whole lot of keys you'll be creating files for, and see if that works instead (such that there's fewer files and you do not hit the xceiver/load limits).
On Mon, Sep 17, 2012 at 7:20 PM, Jason Yang <[EMAIL PROTECTED]> wrote: > Hi, all > > I have written a simple MR program which partition a file into multiple > files bases on the clustering result of the points in this file, here is my > code: > --- > private int run() throws IOException > { > String scheme = getConf().get(CommonUtility.ATTR_SCHEME); > String ecgDir = getConf().get(CommonUtility.ATTR_ECG_DATA_DIR); > String outputDir = getConf().get(CommonUtility.ATTR_OUTPUT_DIR); > > // create JobConf > JobConf jobConf = new JobConf(getConf(), this.getClass()); > > // set path for input and output > Path inPath = new Path(scheme + ecgDir); > Path outPath = new Path(scheme + outputDir + > CommonUtility.OUTPUT_LOCAL_CLUSTERING); > FileInputFormat.addInputPath(jobConf, inPath); > FileOutputFormat.setOutputPath(jobConf, outPath); > > // clear output if it already existed > CommonUtility.deleteHDFSFile(outPath.toString()); > > // set format for input and output > jobConf.setInputFormat(WholeFileInputFormat.class); > jobConf.setOutputFormat(LocalClusterMSFOutputFormat.class); > > // set class of output key and value > jobConf.setOutputKeyClass(Text.class); > jobConf.setOutputValueClass(RRIntervalWritable.class); > > // set mapper and reducer > jobConf.setMapperClass(LocalClusteringMapper.class); > jobConf.setReducerClass(IdentityReducer.class); > > > // run the job > JobClient.runJob(jobConf); > return 0; > } > > ... > > public class LocalClusteringMapper extends MapReduceBase implements > Mapper<NullWritable, BytesWritable, Text, RRIntervalWritable> > { > @Override > public void map(NullWritable key, BytesWritable value, > OutputCollector<Text, RRIntervalWritable> output, Reporter reporter) > throws IOException > { > //read and cluster > ... > > // output > Iterator<RRIntervalWritable> it = rrArray.iterator(); > while (it.hasNext()) > { > RRIntervalWritable rr = it.next(); > > Text outputKey = new Text(rr.clusterResult ); > > output.collect(outputKey, rr); > } > > } > > ... > > public class LocalClusterMSFOutputFormat extends > MultipleSequenceFileOutputFormat<Text, RRIntervalWritable> > { > > protected String generateFileNameForKeyValue(Text key, > RRIntervalWritable value, String name) > { > return value.clusterResult.toString(); > } > } > --- > > But this program always get a IO Exception when running in a > pseudo-distributed cluster, and the log has been attached at the end of this > post. > > There's something wired: > 1. If I use the SequenceFileOutputFormat instead of > MultipleSequenceFileOutputFormat, this program would works fine( at least > there is no error in log). > 2. The one which always cause the error is the EcgData002509_LCF_3 > > >> >> 12/09/17 21:10:35 INFO mapred.MapTask: Starting flush of map output >> 12/09/17 21:10:35 INFO mapred.MapTask: Finished spill 0 >> 12/09/17 21:10:35 INFO mapred.TaskRunner: >> Task:attempt_local_0001_m_000019_0 is done. And is in the process of >> commiting >> 12/09/17 21:10:35 INFO mapred.LocalJobRunner: >> 12/09/17 21:10:35 INFO mapred.TaskRunner: Task >> 'attempt_local_0001_m_000019_0' done. >> 12/09/17 21:10:35 INFO mapred.LocalJobRunner: >> 12/09/17 21:10:35 INFO mapred.Merger: Merging 20 sorted segments >> 12/09/17 21:10:35 INFO mapred.Merger: Merging 2 intermediate segments out >> of a total of 20 >> 12/09/17 21:10:35 INFO mapred.Merger: Merging 10 intermediate segments out
Harsh J
+
Harsh J 2012-09-18, 02:38
-
Re: IOException when using MultipleSequenceFileOutputFormat
Jason Yang 2012-09-18, 03:44
Hey, Harsh
Thanks for your reply.
There are 20 data files as input and each of them would be clustered into 4 groups. And as I used the "DataFileName-groupNum" as output key, so it would be 80 unique keys in total.
According to your suggestion, I have done two following tests:
1) Try a smaller input: I choose 5 files randomly as input, it always works fine! 2) Run it on fully-distributed cluster: it always works fine with 20 data files input in fully-distributed cluster, while It always fails on pseudo-distributed cluster.
So, it seems to be related to the xceiver/load limits you mentioned, and I have changed the xceiver value in the hdfs-site.xml: <property> <name>dfs.datanode.max.xcievers</name> <value>4096</value> </property> but I still got the same error when running with 20 data files inputs in pseudo-distributed clusters.
How could I fix this problem?
2012/9/18 Harsh J <[EMAIL PROTECTED]>
> Hi Jason, > > How many unique keys are you going to be generating from this program, > roughly? > > By default, the max-load of a DN is about 4k threads and if you're > trying to push beyond that value then the NN will no longer select the > DN as it would consider it already overloaded. In a fully distributed > mode, you may not see this issue as there's several DNs and TTs to > distribute the write load across. > > Try with a smaller input sample if there's a whole lot of keys you'll > be creating files for, and see if that works instead (such that > there's fewer files and you do not hit the xceiver/load limits). > > On Mon, Sep 17, 2012 at 7:20 PM, Jason Yang <[EMAIL PROTECTED]> > wrote: > > Hi, all > > > > I have written a simple MR program which partition a file into multiple > > files bases on the clustering result of the points in this file, here is > my > > code: > > --- > > private int run() throws IOException > > { > > String scheme = getConf().get(CommonUtility.ATTR_SCHEME); > > String ecgDir = getConf().get(CommonUtility.ATTR_ECG_DATA_DIR); > > String outputDir = getConf().get(CommonUtility.ATTR_OUTPUT_DIR); > > > > // create JobConf > > JobConf jobConf = new JobConf(getConf(), this.getClass()); > > > > // set path for input and output > > Path inPath = new Path(scheme + ecgDir); > > Path outPath = new Path(scheme + outputDir + > > CommonUtility.OUTPUT_LOCAL_CLUSTERING); > > FileInputFormat.addInputPath(jobConf, inPath); > > FileOutputFormat.setOutputPath(jobConf, outPath); > > > > // clear output if it already existed > > CommonUtility.deleteHDFSFile(outPath.toString()); > > > > // set format for input and output > > jobConf.setInputFormat(WholeFileInputFormat.class); > > jobConf.setOutputFormat(LocalClusterMSFOutputFormat.class); > > > > // set class of output key and value > > jobConf.setOutputKeyClass(Text.class); > > jobConf.setOutputValueClass(RRIntervalWritable.class); > > > > // set mapper and reducer > > jobConf.setMapperClass(LocalClusteringMapper.class); > > jobConf.setReducerClass(IdentityReducer.class); > > > > > > // run the job > > JobClient.runJob(jobConf); > > return 0; > > } > > > > ... > > > > public class LocalClusteringMapper extends MapReduceBase implements > > Mapper<NullWritable, BytesWritable, Text, RRIntervalWritable> > > { > > @Override > > public void map(NullWritable key, BytesWritable value, > > OutputCollector<Text, RRIntervalWritable> output, Reporter reporter) > > throws IOException > > { > > //read and cluster > > ... > > > > // output > > Iterator<RRIntervalWritable> it = rrArray.iterator(); > > while (it.hasNext()) > > { > > RRIntervalWritable rr = it.next(); > > > > Text outputKey = new Text(rr.clusterResult ); > > > > output.collect(outputKey, rr); > > } > > > > } > > > > ... > > > > public class LocalClusterMSFOutputFormat extends > > MultipleSequenceFileOutputFormat<Text, RRIntervalWritable> > > { > > > > protected String generateFileNameForKeyValue(Text key, > > RRIntervalWritable value, String name) > > { > > return value.clusterResult.toString();
YANG, Lin
+
Jason Yang 2012-09-18, 03:44
-
Re: IOException when using MultipleSequenceFileOutputFormat
Harsh J 2012-09-18, 03:59
Jason,
Perhaps then go with Jay's lead here: ulimits (over nproc and nofile mostly). Can you check if they are adequately high for opening several blocks+sockets, for the user that runs the DataNode and for the user that runs the TaskTracker (if insecure mode)?
On Tue, Sep 18, 2012 at 9:14 AM, Jason Yang <[EMAIL PROTECTED]> wrote: > Hey, Harsh > > Thanks for your reply. > > There are 20 data files as input and each of them would be clustered into 4 > groups. And as I used the "DataFileName-groupNum" as output key, so it would > be 80 unique keys in total. > > According to your suggestion, I have done two following tests: > > 1) Try a smaller input: I choose 5 files randomly as input, it always works > fine! > 2) Run it on fully-distributed cluster: it always works fine with 20 data > files input in fully-distributed cluster, while It always fails on > pseudo-distributed cluster. > > So, it seems to be related to the xceiver/load limits you mentioned, and I > have changed the xceiver value in the hdfs-site.xml: > > > <property> > <name>dfs.datanode.max.xcievers</name> > <value>4096</value> > </property> > > > but I still got the same error when running with 20 data files inputs in > pseudo-distributed clusters. > > How could I fix this problem? > > 2012/9/18 Harsh J <[EMAIL PROTECTED]> >> >> Hi Jason, >> >> How many unique keys are you going to be generating from this program, >> roughly? >> >> By default, the max-load of a DN is about 4k threads and if you're >> trying to push beyond that value then the NN will no longer select the >> DN as it would consider it already overloaded. In a fully distributed >> mode, you may not see this issue as there's several DNs and TTs to >> distribute the write load across. >> >> Try with a smaller input sample if there's a whole lot of keys you'll >> be creating files for, and see if that works instead (such that >> there's fewer files and you do not hit the xceiver/load limits). >> >> On Mon, Sep 17, 2012 at 7:20 PM, Jason Yang <[EMAIL PROTECTED]> >> wrote: >> > Hi, all >> > >> > I have written a simple MR program which partition a file into multiple >> > files bases on the clustering result of the points in this file, here is >> > my >> > code: >> > --- >> > private int run() throws IOException >> > { >> > String scheme = getConf().get(CommonUtility.ATTR_SCHEME); >> > String ecgDir = getConf().get(CommonUtility.ATTR_ECG_DATA_DIR); >> > String outputDir = getConf().get(CommonUtility.ATTR_OUTPUT_DIR); >> > >> > // create JobConf >> > JobConf jobConf = new JobConf(getConf(), this.getClass()); >> > >> > // set path for input and output >> > Path inPath = new Path(scheme + ecgDir); >> > Path outPath = new Path(scheme + outputDir + >> > CommonUtility.OUTPUT_LOCAL_CLUSTERING); >> > FileInputFormat.addInputPath(jobConf, inPath); >> > FileOutputFormat.setOutputPath(jobConf, outPath); >> > >> > // clear output if it already existed >> > CommonUtility.deleteHDFSFile(outPath.toString()); >> > >> > // set format for input and output >> > jobConf.setInputFormat(WholeFileInputFormat.class); >> > jobConf.setOutputFormat(LocalClusterMSFOutputFormat.class); >> > >> > // set class of output key and value >> > jobConf.setOutputKeyClass(Text.class); >> > jobConf.setOutputValueClass(RRIntervalWritable.class); >> > >> > // set mapper and reducer >> > jobConf.setMapperClass(LocalClusteringMapper.class); >> > jobConf.setReducerClass(IdentityReducer.class); >> > >> > >> > // run the job >> > JobClient.runJob(jobConf); >> > return 0; >> > } >> > >> > ... >> > >> > public class LocalClusteringMapper extends MapReduceBase implements >> > Mapper<NullWritable, BytesWritable, Text, RRIntervalWritable> >> > { >> > @Override >> > public void map(NullWritable key, BytesWritable value, >> > OutputCollector<Text, RRIntervalWritable> output, Reporter reporter) >> > throws IOException >> > { >> > //read and cluster >> > ... >> > >> > // output >> > Iterator<RRIntervalWritable> it = rrArray.iterator();
Harsh J
+
Harsh J 2012-09-18, 03:59
-
Re: IOException when using MultipleSequenceFileOutputFormat
Jason Yang 2012-09-18, 06:07
Harsh and Jay,
all the limits in my system are:
yanglin@ubuntu:~$ ulimit -a core file size (blocks, -c) 0 data seg size (kbytes, -d) unlimited scheduling priority (-e) 0 file size (blocks, -f) unlimited pending signals (-i) 32059 max locked memory (kbytes, -l) 64 max memory size (kbytes, -m) unlimited open files (-n) 1024 pipe size (512 bytes, -p) 8 POSIX message queues (bytes, -q) 819200 real-time priority (-r) 0 stack size (kbytes, -s) 8192 cpu time (seconds, -t) unlimited max user processes (-u) 32059 virtual memory (kbytes, -v) unlimited file locks (-x) unlimited --- Is there any way I could check which limitation has been reached when running the MapReduce?
BTW, I have a stupid mapper which would read HDFS files in the map() function, but I do call the FSDataStream.close() to close file at the end of map() function, does it matter to this problem? 2012/9/18 Harsh J <[EMAIL PROTECTED]>
> Jason, > > Perhaps then go with Jay's lead here: ulimits (over nproc and nofile > mostly). Can you check if they are adequately high for opening several > blocks+sockets, for the user that runs the DataNode and for the user > that runs the TaskTracker (if insecure mode)? > > On Tue, Sep 18, 2012 at 9:14 AM, Jason Yang <[EMAIL PROTECTED]> > wrote: > > Hey, Harsh > > > > Thanks for your reply. > > > > There are 20 data files as input and each of them would be clustered > into 4 > > groups. And as I used the "DataFileName-groupNum" as output key, so it > would > > be 80 unique keys in total. > > > > According to your suggestion, I have done two following tests: > > > > 1) Try a smaller input: I choose 5 files randomly as input, it always > works > > fine! > > 2) Run it on fully-distributed cluster: it always works fine with 20 data > > files input in fully-distributed cluster, while It always fails on > > pseudo-distributed cluster. > > > > So, it seems to be related to the xceiver/load limits you mentioned, and > I > > have changed the xceiver value in the hdfs-site.xml: > > > > > > <property> > > <name>dfs.datanode.max.xcievers</name> > > <value>4096</value> > > </property> > > > > > > but I still got the same error when running with 20 data files inputs in > > pseudo-distributed clusters. > > > > How could I fix this problem? > > > > 2012/9/18 Harsh J <[EMAIL PROTECTED]> > >> > >> Hi Jason, > >> > >> How many unique keys are you going to be generating from this program, > >> roughly? > >> > >> By default, the max-load of a DN is about 4k threads and if you're > >> trying to push beyond that value then the NN will no longer select the > >> DN as it would consider it already overloaded. In a fully distributed > >> mode, you may not see this issue as there's several DNs and TTs to > >> distribute the write load across. > >> > >> Try with a smaller input sample if there's a whole lot of keys you'll > >> be creating files for, and see if that works instead (such that > >> there's fewer files and you do not hit the xceiver/load limits). > >> > >> On Mon, Sep 17, 2012 at 7:20 PM, Jason Yang <[EMAIL PROTECTED]> > >> wrote: > >> > Hi, all > >> > > >> > I have written a simple MR program which partition a file into > multiple > >> > files bases on the clustering result of the points in this file, here > is > >> > my > >> > code: > >> > --- > >> > private int run() throws IOException > >> > { > >> > String scheme = getConf().get(CommonUtility.ATTR_SCHEME); > >> > String ecgDir = getConf().get(CommonUtility.ATTR_ECG_DATA_DIR); > >> > String outputDir = getConf().get(CommonUtility.ATTR_OUTPUT_DIR); > >> > > >> > // create JobConf > >> > JobConf jobConf = new JobConf(getConf(), this.getClass()); > >> > > >> > // set path for input and output > >> > Path inPath = new Path(scheme + ecgDir); > >> > Path outPath = new Path(scheme + outputDir +
YANG, Lin
+
Jason Yang 2012-09-18, 06:07
-
Re: IOException when using MultipleSequenceFileOutputFormat
Jay Vyas 2012-09-18, 03:24
>> If I use the SequenceFileOutputFormat instead of MultipleSequenceFileOutputFormat, this program would works fine( at least there is no error in log). <<
I might suggest another alternative fix.... Maybe your ulimit is too low in your psuedodistributed OS? The fact that you are using a clustering output means you will have some funny files --- maybe alot of very small ones, and possibly lots of them, more than you normally would distribute to a single node, as Harsh suggests..
On Mon, Sep 17, 2012 at 10:38 PM, Harsh J <[EMAIL PROTECTED]> wrote:
> Hi Jason, > > How many unique keys are you going to be generating from this program, > roughly? > > By default, the max-load of a DN is about 4k threads and if you're > trying to push beyond that value then the NN will no longer select the > DN as it would consider it already overloaded. In a fully distributed > mode, you may not see this issue as there's several DNs and TTs to > distribute the write load across. > > Try with a smaller input sample if there's a whole lot of keys you'll > be creating files for, and see if that works instead (such that > there's fewer files and you do not hit the xceiver/load limits). > > On Mon, Sep 17, 2012 at 7:20 PM, Jason Yang <[EMAIL PROTECTED]> > wrote: > > Hi, all > > > > I have written a simple MR program which partition a file into multiple > > files bases on the clustering result of the points in this file, here is > my > > code: > > --- > > private int run() throws IOException > > { > > String scheme = getConf().get(CommonUtility.ATTR_SCHEME); > > String ecgDir = getConf().get(CommonUtility.ATTR_ECG_DATA_DIR); > > String outputDir = getConf().get(CommonUtility.ATTR_OUTPUT_DIR); > > > > // create JobConf > > JobConf jobConf = new JobConf(getConf(), this.getClass()); > > > > // set path for input and output > > Path inPath = new Path(scheme + ecgDir); > > Path outPath = new Path(scheme + outputDir + > > CommonUtility.OUTPUT_LOCAL_CLUSTERING); > > FileInputFormat.addInputPath(jobConf, inPath); > > FileOutputFormat.setOutputPath(jobConf, outPath); > > > > // clear output if it already existed > > CommonUtility.deleteHDFSFile(outPath.toString()); > > > > // set format for input and output > > jobConf.setInputFormat(WholeFileInputFormat.class); > > jobConf.setOutputFormat(LocalClusterMSFOutputFormat.class); > > > > // set class of output key and value > > jobConf.setOutputKeyClass(Text.class); > > jobConf.setOutputValueClass(RRIntervalWritable.class); > > > > // set mapper and reducer > > jobConf.setMapperClass(LocalClusteringMapper.class); > > jobConf.setReducerClass(IdentityReducer.class); > > > > > > // run the job > > JobClient.runJob(jobConf); > > return 0; > > } > > > > ... > > > > public class LocalClusteringMapper extends MapReduceBase implements > > Mapper<NullWritable, BytesWritable, Text, RRIntervalWritable> > > { > > @Override > > public void map(NullWritable key, BytesWritable value, > > OutputCollector<Text, RRIntervalWritable> output, Reporter reporter) > > throws IOException > > { > > //read and cluster > > ... > > > > // output > > Iterator<RRIntervalWritable> it = rrArray.iterator(); > > while (it.hasNext()) > > { > > RRIntervalWritable rr = it.next(); > > > > Text outputKey = new Text(rr.clusterResult ); > > > > output.collect(outputKey, rr); > > } > > > > } > > > > ... > > > > public class LocalClusterMSFOutputFormat extends > > MultipleSequenceFileOutputFormat<Text, RRIntervalWritable> > > { > > > > protected String generateFileNameForKeyValue(Text key, > > RRIntervalWritable value, String name) > > { > > return value.clusterResult.toString(); > > } > > } > > --- > > > > But this program always get a IO Exception when running in a > > pseudo-distributed cluster, and the log has been attached at the end of > this > > post. > > > > There's something wired: > > 1. If I use the SequenceFileOutputFormat instead of > > MultipleSequenceFileOutputFormat, this program would works fine( at least > > there is no error in log).
Jay Vyas MMSB/UCHC
+
Jay Vyas 2012-09-18, 03:24
-
Re: IOException when using MultipleSequenceFileOutputFormat
Hien Luu 2012-09-17, 16:35
I ran into a similar problem the other day. It turns out the datanode was not running.
Type 'jps' to see if the datanode process is up and running.
Hien ________________________________ From: Jason Yang <[EMAIL PROTECTED]> To: [EMAIL PROTECTED] Sent: Monday, September 17, 2012 6:50 AM Subject: IOException when using MultipleSequenceFileOutputFormat
Hi, all
I have written a simple MR program which partition a file into multiple files bases on the clustering result of the points in this file, here is my code: --- private int run() throws IOException { String scheme = getConf().get(CommonUtility.ATTR_SCHEME); String ecgDir = getConf().get(CommonUtility.ATTR_ECG_DATA_DIR); String outputDir = getConf().get(CommonUtility.ATTR_OUTPUT_DIR);
// create JobConf JobConf jobConf = new JobConf(getConf(), this.getClass());
// set path for input and output Path inPath = new Path(scheme + ecgDir); Path outPath = new Path(scheme + outputDir + CommonUtility.OUTPUT_LOCAL_CLUSTERING); FileInputFormat.addInputPath(jobConf, inPath); FileOutputFormat.setOutputPath(jobConf, outPath);
// clear output if it already existed CommonUtility.deleteHDFSFile(outPath.toString());
// set format for input and output jobConf.setInputFormat(WholeFileInputFormat.class); jobConf.setOutputFormat(LocalClusterMSFOutputFormat.class);
// set class of output key and value jobConf.setOutputKeyClass(Text.class); jobConf.setOutputValueClass(RRIntervalWritable.class);
// set mapper and reducer jobConf.setMapperClass(LocalClusteringMapper.class); jobConf.setReducerClass(IdentityReducer.class); // run the job JobClient.runJob(jobConf); return 0; }
...
public class LocalClusteringMapper extends MapReduceBase implements Mapper<NullWritable, BytesWritable, Text, RRIntervalWritable> { @Override public void map(NullWritable key, BytesWritable value, OutputCollector<Text, RRIntervalWritable> output, Reporter reporter) throws IOException { //read and cluster ...
// output Iterator<RRIntervalWritable> it = rrArray.iterator(); while (it.hasNext()) { RRIntervalWritable rr = it.next();
Text outputKey = new Text(rr.clusterResult );
output.collect(outputKey, rr); }
}
...
public class LocalClusterMSFOutputFormat extends MultipleSequenceFileOutputFormat<Text, RRIntervalWritable> {
protected String generateFileNameForKeyValue(Text key, RRIntervalWritable value, String name) { return value.clusterResult.toString(); } } ---
But this program always get a IO Exception when running in a pseudo-distributed cluster, and the log has been attached at the end of this post.
There's something wired: 1. If I use the SequenceFileOutputFormat instead of MultipleSequenceFileOutputFormat, this program would works fine( at least there is no error in log). 2. The one which always cause the error is the EcgData002509_LCF_3 >12/09/17 21:10:35 INFO mapred.MapTask: Starting flush of map output >12/09/17 21:10:35 INFO mapred.MapTask: Finished spill 0 >12/09/17 21:10:35 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000019_0 is done. And is in the process of commiting >12/09/17 21:10:35 INFO mapred.LocalJobRunner: >12/09/17 21:10:35 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000019_0' done. >12/09/17 21:10:35 INFO mapred.LocalJobRunner: >12/09/17 21:10:35 INFO mapred.Merger: Merging 20 sorted segments >12/09/17 21:10:35 INFO mapred.Merger: Merging 2 intermediate segments out of a total of 20 >12/09/17 21:10:35 INFO mapred.Merger: Merging 10 intermediate segments out of a total of 19 >12/09/17 21:10:35 INFO mapred.Merger: Down to the last merge-pass, with 10 segments left of total size: 18913891 bytes >12/09/17 21:10:35 INFO mapred.LocalJobRunner: >12/09/17 21:10:39 WARN hdfs.DFSClient: DataStreamer Exception: org.apache.hadoop.ipc.RemoteException: java.io.IOException: File /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3 could only be replicated to 0 nodes, instead of 1 >at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271)
YANG, Lin
+
Hien Luu 2012-09-17, 16:35
-
Re: IOException when using MultipleSequenceFileOutputFormat
Jason Yang 2012-09-17, 17:00
hey, Hien~
The datanode is running.
yanglin@ubuntu:~$ jps > 14475 SecondaryNameNode > 2642 > 14838 TaskTracker > 14550 JobTracker > 13877 NameNode > 14177 DataNode > 18811 Jps I have found that if I use the MultipleSequenceFileOutputFormat instead of LocalClusterMSFOutputFormat, this program works fine. here is my inherited class which return a string according to the clustering result:
public class LocalClusterMSFOutputFormat extends > MultipleSequenceFileOutputFormat<Text, RRIntervalWritable> > { > protected String generateFileNameForKeyValue(Text key, > RRIntervalWritable value, String name) > { > > return value.clusterResult.toString(); > } > } Is there any limitation of the path length or number of files in a single directory?
2012/9/18 Hien Luu <[EMAIL PROTECTED]>
> I ran into a similar problem the other day. It turns out the datanode was > not running. > > Type 'jps' to see if the datanode process is up and running. > > Hien > > ------------------------------ > *From:* Jason Yang <[EMAIL PROTECTED]> > *To:* [EMAIL PROTECTED] > *Sent:* Monday, September 17, 2012 6:50 AM > *Subject:* IOException when using MultipleSequenceFileOutputFormat > > Hi, all > > I have written a simple MR program which partition a file into multiple > files bases on the clustering result of the points in this file, here is my > code: > --- > private int run() throws IOException > { > String scheme = getConf().get(CommonUtility.ATTR_SCHEME); > String ecgDir = getConf().get(CommonUtility.ATTR_ECG_DATA_DIR); > String outputDir = getConf().get(CommonUtility.ATTR_OUTPUT_DIR); > > // create JobConf > JobConf jobConf = new JobConf(getConf(), this.getClass()); > > // set path for input and output > Path inPath = new Path(scheme + ecgDir); > Path outPath = new Path(scheme + outputDir + > CommonUtility.OUTPUT_LOCAL_CLUSTERING); > FileInputFormat.addInputPath(jobConf, inPath); > FileOutputFormat.setOutputPath(jobConf, outPath); > > // clear output if it already existed > CommonUtility.deleteHDFSFile(outPath.toString()); > > // set format for input and output > jobConf.setInputFormat(WholeFileInputFormat.class); > jobConf.setOutputFormat(LocalClusterMSFOutputFormat.class); > > // set class of output key and value > jobConf.setOutputKeyClass(Text.class); > jobConf.setOutputValueClass(RRIntervalWritable.class); > > // set mapper and reducer > jobConf.setMapperClass(LocalClusteringMapper.class); > jobConf.setReducerClass(IdentityReducer.class); > > > // run the job > JobClient.runJob(jobConf); > return 0; > } > > ... > > public class LocalClusteringMapper extends MapReduceBase implements > Mapper<NullWritable, BytesWritable, Text, RRIntervalWritable> > { > @Override > public void map(NullWritable key, BytesWritable value, > OutputCollector<Text, RRIntervalWritable> output, Reporter reporter) > throws IOException > { > //read and cluster > ... > > // output > Iterator<RRIntervalWritable> it = rrArray.iterator(); > while (it.hasNext()) > { > RRIntervalWritable rr = it.next(); > > Text outputKey = new Text(rr.clusterResult ); > > output.collect(outputKey, rr); > } > > } > > ... > > public class LocalClusterMSFOutputFormat extends > MultipleSequenceFileOutputFormat<Text, RRIntervalWritable> > { > > protected String generateFileNameForKeyValue(Text key, > RRIntervalWritable value, String name) > { > return value.clusterResult.toString(); > } > } > --- > > But this program always get a IO Exception when running in a > pseudo-distributed cluster, and the log has been attached at the end of > this post. > > There's something wired: > 1. If I use the SequenceFileOutputFormat instead of > MultipleSequenceFileOutputFormat, this program would works fine( at least > there is no error in log). > 2. The one which always cause the error is the EcgData002509_LCF_3 > > > > 12/09/17 21:10:35 INFO mapred.MapTask: Starting flush of map output > 12/09/17 21:10:35 INFO mapred.MapTask: Finished spill 0 > 12/09/17 21:10:35 INFO mapred.TaskRunner: YANG, Lin
+
Jason Yang 2012-09-17, 17:00
-
Re: IOException when using MultipleSequenceFileOutputFormat
Jason Yang 2012-09-17, 16:11
I checked the log of NameNode and found that: ---- 2012-09-18 00:01:48,056 WARN org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Not able to place enough replicas, still in need of 1 2012-09-18 00:01:48,057 INFO org.apache.hadoop.ipc.Server: IPC Server handler 9 on 8020, call addBlock(/work/output4/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData003637_LCF_0, DFSClient_1720567089) from 127.0.0.1:46611: error: java.io.IOException: File /work/output4/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData003637_LCF_0 could only be replicated to 0 nodes, instead of 1 java.io.IOException: File /work/output4/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData003637_LCF_0 could only be replicated to 0 nodes, instead of 1 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271) at org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422) ... ---- Is it relative to this problem?
2012/9/17 Jason Yang <[EMAIL PROTECTED]>
> Hi, all > > I have written a simple MR program which partition a file into multiple > files bases on the clustering result of the points in this file, here is my > code: > --- > private int run() throws IOException > { > String scheme = getConf().get(CommonUtility.ATTR_SCHEME); > String ecgDir = getConf().get(CommonUtility.ATTR_ECG_DATA_DIR); > String outputDir = getConf().get(CommonUtility.ATTR_OUTPUT_DIR); > > // create JobConf > JobConf jobConf = new JobConf(getConf(), this.getClass()); > > // set path for input and output > Path inPath = new Path(scheme + ecgDir); > Path outPath = new Path(scheme + outputDir + > CommonUtility.OUTPUT_LOCAL_CLUSTERING); > FileInputFormat.addInputPath(jobConf, inPath); > FileOutputFormat.setOutputPath(jobConf, outPath); > > // clear output if it already existed > CommonUtility.deleteHDFSFile(outPath.toString()); > > // set format for input and output > jobConf.setInputFormat(WholeFileInputFormat.class); > jobConf.setOutputFormat(LocalClusterMSFOutputFormat.class); > > // set class of output key and value > jobConf.setOutputKeyClass(Text.class); > jobConf.setOutputValueClass(RRIntervalWritable.class); > > // set mapper and reducer > jobConf.setMapperClass(LocalClusteringMapper.class); > jobConf.setReducerClass(IdentityReducer.class); > > > // run the job > JobClient.runJob(jobConf); > return 0; > } > > ... > > public class LocalClusteringMapper extends MapReduceBase implements > Mapper<NullWritable, BytesWritable, Text, RRIntervalWritable> > { > @Override > public void map(NullWritable key, BytesWritable value, > OutputCollector<Text, RRIntervalWritable> output, Reporter reporter) > throws IOException > { > //read and cluster > ... > > // output > Iterator<RRIntervalWritable> it = rrArray.iterator(); > while (it.hasNext()) > { > RRIntervalWritable rr = it.next(); > > Text outputKey = new Text(rr.clusterResult ); > > output.collect(outputKey, rr); > } > > } > > ... > > public class LocalClusterMSFOutputFormat extends > MultipleSequenceFileOutputFormat<Text, RRIntervalWritable> > { > > protected String generateFileNameForKeyValue(Text key, > RRIntervalWritable value, String name) > { > return value.clusterResult.toString(); > } > } > --- > > But this program always get a IO Exception when running in a > pseudo-distributed cluster, and the log has been attached at the end of > this post. > > There's something wired: > 1. If I use the SequenceFileOutputFormat instead of > MultipleSequenceFileOutputFormat, this program would works fine( at least > there is no error in log). > 2. The one which always cause the error is the EcgData002509_LCF_3 > > > >> 12/09/17 21:10:35 INFO mapred.MapTask: Starting flush of map output >> 12/09/17 21:10:35 INFO mapred.MapTask: Finished spill 0 >> 12/09/17 21:10:35 INFO mapred.TaskRunner: >> Task:attempt_local_0001_m_000019_0 is done. And is in the process of >> commiting >> 12/09/17 21:10:35 INFO mapred.LocalJobRunner: YANG, Lin
+
Jason Yang 2012-09-17, 16:11
|
|