Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Avro >> mail # user >> Can spill to disk be in compressed Avro format to reduce I/O?


Copy link to this message
-
Re: Can spill to disk be in compressed Avro format to reduce I/O?


On 1/12/12 11:24 AM, "Frank Grimes" <[EMAIL PROTECTED]> wrote:

> Hi Scott,
>
> If I have a map-only job, would I want only one mapper running to pull all the
> records from the source input files and stream/append them to the target avro
> file?
> Would that be no different (or more efficient) than doing "hadoop dfs -cat
> file1 file2 file3" and piping the output to append to a "hadoop dfs -put
> combinedFile"?
> In that case, my only question is how would I combine the avro files into a
> new file without deserializing them?

It would be different.  An Avro file has a header that contains the Schema
and compression codec info along with other metadata, followed by data
blocks.  Each data block has a record count and size prefix and a 16 byte
delimiter.  You cannot simply concatenate them together because the schema
or compression codec may differ, a header in the middle of the file is not
allowed, and the delimiter may differ.

http://avro.apache.org/docs/current/api/java/org/apache/avro/file/DataFileWr
iter.html

DataFileWriter can append a pre-existing file with the same schema, in
particular look at the documentation for appendAllFrom()
http://avro.apache.org/docs/current/api/java/org/apache/avro/file/DataFileWr
iter.html#appendAllFrom%28org.apache.avro.file.DataFileStream,%20boolean%29

>
> Thanks,
>
> Frank Grimes
>
>
> On 2012-01-12, at 1:14 PM, Scott Carey wrote:
>
>>
>>
>> On 1/12/12 8:27 AM, "Frank Grimes" <[EMAIL PROTECTED]> wrote:
>>
>>> Hi All,
>>>
>>> We have Avro data files in HDFS which are compressed using the Deflate
>>> codec.
>>> We have written an M/R job using the Avro Mapred API to combine those files.
>>>
>>> It seems to be working fine, however when we run it we notice that the
>>> temporary work area (spills, etc) seem to be uncompressed.
>>> We're thinking we might see a speedup due to reduced I/O if the temporary
>>> files are compressed as well.
>>
>> If all you want to do is combine the files, there is no reason to deserialize
>> and reserialize the contents, and a map-only job could suffice.
>> If this is the case, you might want to consider one of two optoins:
>> 1.  Use a map only job, with a combined file input.  This will produce one
>> file per mapper and no intermediate data.
>> 2.  Use the Avro data file API to append to a file.  I am not sure if this
>> will work with HDFS without some modifications to Avro, but it should be
>> possible since the data file APIs can take InputStream/OutputStream.  The
>> data file API has the ability to append data blocks from the file if the
>> schemas are an exact match.  This can be done without deserialization, and
>> optionally can change the compression level or leave it alone.
>>
>>>
>>> Is there a way to enable "mapred.compress.map.output" in such a way that
>>> those temporary files are compressed as Avro/Deflate?
>>> I tried simply setting conf.setBoolean("mapred.compress.map.output", true);
>>> but it didn't seem to have any effect.
>>
>> I am not sure, as I haven't tried it myself.  However, the Avro M/R should be
>> able to leverage all of the Hadoop compressed intermediate forms.  LZO/Snappy
>> are fast and in our cluster Snappy is the default.  Deflate can be a lot
>> slower but much more compact.
>>
>>>
>>> Note that in order to avoid unnecessary sorting overhead, I made each key a
>>> constant (1L) so that the logs are combined but ordering isn't necessarily
>>> preserved. (we don't care about ordering)
>>
>> In that case, I think you can use a map only job.  There may be some work to
>> get a single mapper to read many files however.
>>
>>>
>>> FYI, here are my mapper and reducer.
>>>
>>>
>>> public static class AvroReachMapper extends AvroMapper<DeliveryLogEvent,
>>> Pair<Long, DeliveryLogEvent>> {
>>> public void map(DeliveryLogEvent levent, AvroCollector<Pair<Long,
>>> DeliveryLogEvent>> collector, Reporter reporter)
>>> throws IOException {
>>>
>>> collector.collect(new Pair<Long, DeliveryLogEvent>(1L, levent));