Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
MapReduce, mail # user - RE: Hadoop throughput question


Copy link to this message
-
RE: Hadoop throughput question
Artem Ervits 2013-01-04, 14:34
John, the two programs below, one is from the Definitive Guide chapter 4 with slight mods and the other is in-house but similar to Hadoop in Action chap 3.

package sequencefileprocessor;

// cc SequenceFileReadDemo Reading a SequenceFile
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

// vv SequenceFileReadDemo
public class SequenceFileProcessor
{
    public static void main(String[] args) throws IOException
    {
        Configuration conf = new Configuration();
        conf.set("mapred.map.child.java.opts", "-Xmx256m");
        conf.set("mapred.reduce.child.java.opts", "-Xmx256m");
        //conf.set("io.file.buffer.size", "65536");  //10mb/sec improvement, jumped from 26mb/sec to 36mb/sec
        conf.set("io.file.buffer.size", "131072");  // 15mb/sec improvement, jumped from 26mb/sec to 39mb/sec

        FileSystem fs = null;
        Path path = null;
        int total_count = 0;
        int count = 0;
        long start = System.currentTimeMillis();

        for (String uri : args)
        {
            fs = FileSystem.get(URI.create(uri), conf);
            path = new Path(uri);

            SequenceFile.Reader reader = null;
            try
            {
                reader = new SequenceFile.Reader(fs, path, conf);
                Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
                Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
                long position = reader.getPosition();
                while (reader.next(key, value))
                {
                   String syncSeen = reader.syncSeen() ? "*" : "";
                    //System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
                    position = reader.getPosition(); // beginning of next record
                    count += 1;
                    //System.out.println("count is: " + count);

                    if((count % 1000000) == 0)
                        System.out.println("processed " + count + " records");
                }
            }
            finally
            {
                IOUtils.closeStream(reader);
            }
        }
        total_count += count;
        System.out.println("Total count: " + total_count);
        System.out.println("Elapsed time: " + ((System.currentTimeMillis() - start) / 1000) + " seconds");
    }
}
// ^^ SequenceFileReadDemo
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package hdfsspeedtest;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;

/**** code is *****/

public class HDFSSpeedTest {

    public static void main(String[] args) throws Exception {

        System.out.println(new Date().toString());

        Path pt = new Path(args[0]);
        try {

            // Use this for reading the data.
            byte[] buffer = new byte[32*1024*1024];

            Configuration conf = new Configuration();
            //conf.set(null, null);
            FileSystem fs = FileSystem.get(conf);
            FileStatus[] inputFiles = fs.listStatus(pt);
            long total = 0;

           for(int i=0; i<inputFiles.length; i++)
           {
            //InputStreamReader inputStream = new InputStreamReader(fs.open(pt));
             if(inputFiles[i].getPath().getName().startsWith("part"))
             {
                System.out.println(inputFiles[i].getPath().getName());
                FSDataInputStream inputStream = fs.open(inputFiles[i].getPath());
                //inputStream.re

                // read fills buffer with data and returns
                // the number of bytes read (which of course
                // may be less than the buffer size, but
                // it will never be more).

                int nRead = 0;
                while((nRead = inputStream.read(buffer)) != -1) {
                    total += nRead;
                }
            // Always close files.
                inputStream.close();
             }

           }
            System.out.println("Read " + total + " bytes");
            System.out.println(new Date().toString());
        }
        catch(FileNotFoundException ex) {
            System.out.println(
                "Unable to open file '" +
                pt + "'");
        }
        catch(IOException ex) {
            System.out.println(
                "Error reading file '"
                + pt + "'");
            // Or we could just do this:
            // ex.printStackTrace();
        }
    }
}

From: John Lilley [mailto:[EMAIL PROTECTED]]
Sent: Thursday, January 03, 2013 9:04 PM
To: [EMAIL PROTECTED]
Subject: RE: Hadoop throughput question
Perhaps if Artem posted the presumably-simple code we could get other users to benchmark other 4-node systems and compare.
Artem Ervits <[EMAIL PROTECTED]<mailto:[EMAIL PROTECTED]>> wrote:
Setting the property to 64k made the throughput jump to 36mb/sec, 39mb for 128k.

Thank you for the tip.

From: Michael Katzenellenbogen [mailto:[EMAIL PROTECTED]]<mailto:[mailto:[EMAIL PROTECTED]]>
Sent: Thursday, January 03, 2013 7:28 PM
To: [EMAIL PROTECTED]<mailto:[EMAIL PROTECTED]>
Subject: Re: Hadoop throughput question

What is the value of the io.file.buffer.size property? Try tuning it up to 64k or 128k and see if this improves performance when reading SequenceFiles.

-Michael

On Jan 3, 2013, at 7:00 PM, Artem Ervits <[EMAIL PROTECTED]<mailto:[EMAIL PROTECTED]>> wro