|
|
-
Custom input format query
Mapred Learn 2011-05-20, 00:44
Hi, I have implemented a custom record reader to read fixed length records. Pseudo code is as:
class CRecordReader extends RecordReader<Text, BytesWritable> { private FileSplit fileSplit; private Configuration conf; private int recordSize; private int fileSize; private int recordNum = 0; private FSDataInputStream fileIn = null; private Text key = null; private BytesWritable value = null; private static int offset = 0; private String layoutStr ="";
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException { fileSplit = (FileSplit) inputSplit; final Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(conf); fileIn = fs.open(fileSplit.getPath()); int metaInfoLen = fileIn.readInt(); byte[] metaStr = new byte[metaInfoLen]; int bytesRead = fileIn.read(metaStr, 0, metaInfoLen); if (bytesRead <= 0) { System.out.println("error bytes"); } String metaInfo = new String(metaStr, "US-ASCII"); String[] fileMetadata = metaInfo.split("-"); String fileLenStr = fileMetadata[1]; String recordLenStr = fileMetadata[2]; layoutStr = fileMetadata[3]; recordSize = Integer.parseInt(recordLenStr); fileSize = Integer.parseInt(fileLenStr); }
public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub if (key ==null) { key = new Text(); }
key.set(recordNum + "-" + layoutStr);
if (value == null) { value = new BytesWritable(); }
int bytesRead = 0; while (offset < fileSize ) { byte[] record = new byte[recordSize];
bytesRead = fileIn.read(record, 0, recordSize);
if ((bytesRead == 0) || (bytesRead < recordSize)) { key = null; value = null; return false; } offset += bytesRead; value.set(record, 0, recordSize); } return true; } }
The problem is that my input file has only 2 records but mapper keeps on iteraing over the first record again and again and never ends.
Obviously I am doing it wrong.. Could somebody help what am I doing wrong ?
Thanks in advance - Jimmy
-
Re: Custom input format query
Steve Lewis 2011-05-20, 00:57
1) add
@Override public Text getCurrentKey() { return key; }
@Override public Text getCurrentValue() { return value; }
2) do not make offset static 3) nextKeyValue should read a single record not while (offset < fileSize ) { ...
On Thu, May 19, 2011 at 5:44 PM, Mapred Learn <[EMAIL PROTECTED]>wrote:
> Hi, > I have implemented a custom record reader to read fixed length records. > Pseudo code is as: > > class CRecordReader extends RecordReader<Text, BytesWritable> { > private FileSplit fileSplit; > private Configuration conf; > private int recordSize; > private int fileSize; > private int recordNum = 0; > private FSDataInputStream fileIn = null; > private Text key = null; > private BytesWritable value = null; > private static int offset = 0; > private String layoutStr =""; > > public void initialize(InputSplit inputSplit, > TaskAttemptContext taskAttemptContext) throws IOException { > fileSplit = (FileSplit) inputSplit; > final Path file = fileSplit.getPath(); > FileSystem fs = file.getFileSystem(conf); > fileIn = fs.open(fileSplit.getPath()); > int metaInfoLen = fileIn.readInt(); > byte[] metaStr = new byte[metaInfoLen]; > int bytesRead = fileIn.read(metaStr, 0, metaInfoLen); > if (bytesRead <= 0) { > System.out.println("error bytes"); > } > String metaInfo = new String(metaStr, "US-ASCII"); > String[] fileMetadata = metaInfo.split("-"); > String fileLenStr = fileMetadata[1]; > String recordLenStr = fileMetadata[2]; > layoutStr = fileMetadata[3]; > recordSize = Integer.parseInt(recordLenStr); > fileSize = Integer.parseInt(fileLenStr); > } > > public boolean nextKeyValue() throws IOException, InterruptedException { > // TODO Auto-generated method stub > if (key ==null) { > key = new Text(); > } > > key.set(recordNum + "-" + layoutStr); > > if (value == null) { > value = new BytesWritable(); > } > > int bytesRead = 0; > while (offset < fileSize ) { > byte[] record = new byte[recordSize]; > > bytesRead = fileIn.read(record, 0, recordSize); > > if ((bytesRead == 0) || (bytesRead < recordSize)) { > key = null; > value = null; > return false; > } > offset += bytesRead; > value.set(record, 0, recordSize); > } > return true; > } > } > > The problem is that my input file has only 2 records but mapper keeps on > iteraing over the first record again and again and never ends. > > Obviously I am doing it wrong.. Could somebody help what am I doing wrong ? > > Thanks in advance > - Jimmy >
-- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
-
Re: Custom input format query
Mapred Learn 2011-05-23, 21:33
Hi Steve, I did what you said and removed while loop and replaced with: if (offset < fileSize) { . }
but in this case, mapper is not processing complete file. It stops somewhere in the middle and processes only more than half the records in input split. For eg. fileSize is 116500004 and last value of offset that i see in log is 67108664. This happens in all the mappers. On Thu, May 19, 2011 at 5:57 PM, Steve Lewis <[EMAIL PROTECTED]> wrote:
> 1) add > > @Override > public Text getCurrentKey() { > return key; > } > > @Override > public Text getCurrentValue() { > return value; > } > > 2) do not make offset static > 3) nextKeyValue should read a single record not > while (offset < fileSize ) { ... > > On Thu, May 19, 2011 at 5:44 PM, Mapred Learn <[EMAIL PROTECTED]>wrote: > >> Hi, >> I have implemented a custom record reader to read fixed length records. >> Pseudo code is as: >> >> class CRecordReader extends RecordReader<Text, BytesWritable> { >> private FileSplit fileSplit; >> private Configuration conf; >> private int recordSize; >> private int fileSize; >> private int recordNum = 0; >> private FSDataInputStream fileIn = null; >> private Text key = null; >> private BytesWritable value = null; >> private static int offset = 0; >> private String layoutStr =""; >> >> public void initialize(InputSplit inputSplit, >> TaskAttemptContext taskAttemptContext) throws IOException { >> fileSplit = (FileSplit) inputSplit; >> final Path file = fileSplit.getPath(); >> FileSystem fs = file.getFileSystem(conf); >> fileIn = fs.open(fileSplit.getPath()); >> int metaInfoLen = fileIn.readInt(); >> byte[] metaStr = new byte[metaInfoLen]; >> int bytesRead = fileIn.read(metaStr, 0, metaInfoLen); >> if (bytesRead <= 0) { >> System.out.println("error bytes"); >> } >> String metaInfo = new String(metaStr, "US-ASCII"); >> String[] fileMetadata = metaInfo.split("-"); >> String fileLenStr = fileMetadata[1]; >> String recordLenStr = fileMetadata[2]; >> layoutStr = fileMetadata[3]; >> recordSize = Integer.parseInt(recordLenStr); >> fileSize = Integer.parseInt(fileLenStr); >> } >> >> public boolean nextKeyValue() throws IOException, InterruptedException { >> // TODO Auto-generated method stub >> if (key ==null) { >> key = new Text(); >> } >> >> key.set(recordNum + "-" + layoutStr); >> >> if (value == null) { >> value = new BytesWritable(); >> } >> >> int bytesRead = 0; >> while (offset < fileSize ) { >> byte[] record = new byte[recordSize]; >> >> bytesRead = fileIn.read(record, 0, recordSize); >> >> if ((bytesRead == 0) || (bytesRead < recordSize)) { >> key = null; >> value = null; >> return false; >> } >> offset += bytesRead; >> value.set(record, 0, recordSize); >> } >> return true; >> } >> } >> >> The problem is that my input file has only 2 records but mapper keeps on >> iteraing over the first record again and again and never ends. >> >> Obviously I am doing it wrong.. Could somebody help what am I doing wrong >> ? >> >> Thanks in advance >> - Jimmy >> > > > > -- > Steven M. Lewis PhD > 4221 105th Ave NE > Kirkland, WA 98033 > 206-384-1340 (cell) > Skype lordjoe_com > > >
|
|