Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Hive, mail # user - a hive  bug about udf


Copy link to this message
-
a hive  bug about udf
王锋 2012-08-09, 11:04
Hi,
    the source code of my udf minf:
import org.apache.hadoop.hive.ql.exec.UDF;
import com.sina.dip.util.DateUtil;
public class Minf extends UDF {
    public String evaluate(String time) {
        if (time != null && time.trim().length() > 0) {
            time = time.trim();
            time = time.replace("[", "");
            String yyyymmdd = DateUtil.DateToString(DateUtil.engStrToDate(time,
                    "dd/MMM/yyyy:HH:mm:ss"), "yyyyMMdd");
          
            int HH = Integer.parseInt(DateUtil.DateToString(DateUtil.engStrToDate(time,
                    "dd/MMM/yyyy:HH:mm:ss"), "HH"));
            int mm = Integer.parseInt(DateUtil.DateToString(DateUtil.engStrToDate(time,
                    "dd/MMM/yyyy:HH:mm:ss"), "mm"));
            int minif = HH * 12 + mm / 5;
            String Minif = yyyymmdd + String.format("%03d", minif);
            if (Minif != null) {
                if(Minif.contains("\\[")){
                    System.out.println("yyyymmdd="+yyyymmdd+"\tMinf="+Minif);
                }
                return Minif.trim().toString();
            } else {
                return "";
            }
        } else
            return "";
    }
    public static void main(String[] args) {
        String time = "[09/Aug/2012:16:49:59";
        System.out.println(new Minf().evaluate(time));
    }
}
the code can transform  time from apache log to minf . for example '[09/Aug/2012:16:49:59' --> 20120809201
but when we use hive using udf  to process apache logs, sometimes we found some of minfs were wrong,such as:
[09/Aug/2012:16:49:59   201208[09201
[09/Aug/2012:16:49:59   201208[09201
 
but only use the method evaluate code to run a mr ,the code:
public class TestMinf {
    public static String evaluate(String time) {
        if (time != null && time.trim().length() > 0) {
            time = time.trim();
            time = time.replace("[", "");
            String yyyymmdd = DateUtil.DateToString(DateUtil.engStrToDate(time,
                    "dd/MMM/yyyy:HH:mm:ss"), "yyyyMMdd");
          
            int HH = Integer.parseInt(DateUtil.DateToString(DateUtil.engStrToDate(time,
                    "dd/MMM/yyyy:HH:mm:ss"), "HH"));
            int mm = Integer.parseInt(DateUtil.DateToString(DateUtil.engStrToDate(time,
                    "dd/MMM/yyyy:HH:mm:ss"), "mm"));
            int minif = HH * 12 + mm / 5;
            String Minif = yyyymmdd + String.format("%03d", minif);
            if (Minif != null) {
                if(Minif.contains("\\[")){
                    System.out.println("yyyymmdd="+yyyymmdd+"\tMinf="+Minif);
                }
                return Minif.trim().toString();
            } else {
                return "";
            }
        } else
            return "";
    }
    
    public static class ExtendLogMapper extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, LongWritable> {
        public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output,
                Reporter reporter) throws IOException {
          
            Text minf=new Text();
                String line = value.toString();
                Map data=RegexUtil.parseApache(line);
                String createtime=(String)data.get("createtime");
                String _minf=evaluate(createtime);
                minf.set(_minf);
                output.collect(minf, new LongWritable(1));
        }
    }
    public static class ExtendLogReducer extends MapReduceBase implements
            Reducer<Text, LongWritable, Text, LongWritable> {
        public void reduce(Text key, Iterator<LongWritable> values,
                OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException {
            long sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new LongWritable(sum));
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration cf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(cf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: TestMinfMR  <in> <out>");
            System.exit(1);
        }
        JobConf conf = new JobConf(TestMinf.class);
        conf.setJobName("testmr" );
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(LongWritable.class);
        conf.setMapperClass(ExtendLogMapper.class);
        conf.setCombinerClass(ExtendLogReducer.class);
        conf.setReducerClass(ExtendLogReducer.class);
//        conf.setPartitionerClass(MyPartitioner.class);
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
        conf.setNumReduceTasks(1);
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        JobClient.runJob(conf);
    }
}
Using  this java mr code,we could not found the wrong minf with the same data.
 the source data was in 16 hour  on 2012-08-09,and size was 24,739,162,624Byte.
We thought this question may be caused by hive udf.pls give me some suggestions.
Thanks.