|
|
-
race condition in hadoop 0.20.2 (cdh3u1)
Stan Rosenberg 2012-01-17, 21:21
Hi,
This posting is essentially about a bug, but it is also related to a programmatic idiom endemic to hadoop. Thus, I am posting to 'common-user' as opposed to 'common-dev'; if the latter is more appropriate, please let me know. Also, I checked jira and was unable to find a bug match.
Synopsis ======== It appears that there is a race condition between the spill thread and the main thread. The race condition results in data corruption, specifically ArrayIndexOutOfBoundsException in org.apache.hadoop.mapred.MapTask on line 1134, when running hadoop 0.20.2 (cdh3u1). The race occurs when the spill thread is executing 'readFields' on key(s) while concurrently the main thread is executing 'write' in order to serialize mapper's output. Although this is cloudera's distribution, I believe this also affects apache's distribution.
Details ========= Now let's delve into the details. In order to avoid object allocation, we are led to rely on the singleton pattern. So, consider this code snippet which illustrates a custom key implementation which for the sake of performance re-uses an instance of Text:
class MyKey<T> implements WritableComparable<T> { private String ip; // first part of the key private final static Text DUMMY = new Text(); ...
public void write(DataOutput out) throws IOException { // serialize the first part of the key DUMMY.set(ip); DUMMY.write(out); ... }
public void readFields(DataInput in) throws IOException { // de-serialize the first part of the key DUMMY.readFields(in); ip = DUMMY.toString(); .... } }
The problem with the above is that DUMMY is a shared variable which shouldn't matter in single-threaded environments. However, the spill thread and the main thread are running concurrently, and as exhibited by the stack traces below, the data race ensues. Note that the actual exception is java.lang.ArrayIndexOutOfBoundsException in org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1134). In order to produce the stack traces, I caught the actual exception inside 'write' of my custom key class and dumped the stack traces of all running threads. Thread[main,5,main] java.lang.Thread.dumpThreads(Native Method) java.lang.Thread.getAllStackTraces(Unknown Source) com.proclivitysystems.etl.psguid.PSguidOrTid.write(PSguidOrTid.java:206) org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:90) org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:77) org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:918) org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:574) org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) com.proclivitysystems.etl.psguid.WECPreprocessor.map(WECPreprocessor.java:142) com.proclivitysystems.etl.psguid.WECPreprocessor.map(WECPreprocessor.java:25) org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647) org.apache.hadoop.mapred.MapTask.run(MapTask.java:323) org.apache.hadoop.mapred.Child$4.run(Child.java:270) java.security.AccessController.doPrivileged(Native Method) javax.security.auth.Subject.doAs(Unknown Source) org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127) org.apache.hadoop.mapred.Child.main(Child.java:264)
Thread[SpillThread,5,main] java.lang.Thread.dumpThreads(Native Method) java.lang.Thread.getAllStackTraces(Unknown Source) com.proclivitysystems.etl.psguid.PSguidOrTid.readFields(PSguidOrTid.java:240) org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:125) org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:968) org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:101) org.apache.hadoop.util.QuickSort.sort(QuickSort.java:59) org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1254) org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.java:712) org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask.java:1199) java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException at com.proclivitysystems.etl.psguid.PSguidOrTid.write(PSguidOrTid.java:214) at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:90) at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:77) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:918) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:574) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at com.proclivitysystems.etl.psguid.WECPreprocessor.map(WECPreprocessor.java:142) at com.proclivitysystems.etl.psguid.WECPreprocessor.map(WECPreprocessor.java:25) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323) at org.apache.hadoop.mapred.Child$4.run(Child.java:270) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Unknown Source) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127) at org.apache.hadoop.mapred.Child.main(Child.java:264) Caused by: java.lang.ArrayIndexOutOfBoundsException at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1134) at java.io.DataOutputStream.write(Unknown Source) at org.apache.hadoop.io.Text.write(Text.java:282) at com.proclivitysystems.etl.psguid.PSguidOrTid.write(PSguidOrTid.java:202) ... 15 more
Thanks,
stan
P.S. Upon examining the code in MapTask, it is clear that the spillLock is released before line 1134 where the ArrayIndexOutOfBoundsExcepti
-
Re: race condition in hadoop 0.20.2 (cdh3u1)
Brock Noland 2012-01-17, 23:38
Hi,
tl;dr DUMMY should not be static.
On Tue, Jan 17, 2012 at 3:21 PM, Stan Rosenberg <[EMAIL PROTECTED]> wrote: > > > class MyKey<T> implements WritableComparable<T> { > private String ip; // first part of the key > private final static Text DUMMY = new Text(); > ... > > public void write(DataOutput out) throws IOException { > // serialize the first part of the key > DUMMY.set(ip); > DUMMY.write(out); > ... > } > > public void readFields(DataInput in) throws IOException { > // de-serialize the first part of the key > DUMMY.readFields(in); ip = DUMMY.toString(); > .... > } > }
This class is invalid. A single thread will be executing your mapper or reducer but there will be multiple threads (background threads such as the SpillThread) creating MyKey instances which is exactly what you are seeing. This is by design.
Brock
-
Re: race condition in hadoop 0.20.2 (cdh3u1)
Stan Rosenberg 2012-01-17, 23:51
On Tue, Jan 17, 2012 at 6:38 PM, Brock Noland <[EMAIL PROTECTED]> wrote: > This class is invalid. A single thread will be executing your mapper > or reducer but there will be multiple threads (background threads such > as the SpillThread) creating MyKey instances which is exactly what you > are seeing. This is by design. >
Could you please refer me to where this design decision/assumption is/was documented? Imho, this assumption clashes with the overall object re-use methodology. I would have at least considered making 'readFields' and 'write' synchronized, even if it is to indicate that there are multiple threads executing serialization/de-serialization. (As only a few threads are competing in this case, the performance penalty would have been negligible.)
Thanks,
stan
|
|