Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume >> mail # user >> Handling malformed data when using custom AvroEventSerializer and HDFS Sink


Copy link to this message
-
Re: Handling malformed data when using custom AvroEventSerializer and HDFS Sink
FWIW, here is an example for how this could be handled in a MorphlineInterceptor:

morphlines : [
  {
    id : morphline1
    importCommands : ["org.kitesdk.**"]
    
    commands : [
      {
        tryRules {
          catchExceptions: true
          rules : [
            # first rule
            {
              commands : [
                # save initial state
                { setValues { _tmp : "@{_attachment_body}" } }
                
                # if JSON parsing succeeds replace _attachment_body with JSON jackson object
                { readJson {} }
                
                # if we reach here the JSON parsing has succeeded
                # restore state prior to readJson command
                { setValues { _attachment_body : "@{_tmp}" } }
                { setValues { _tmp : [] } }
                { setValues { _attachment_mimetype : [] } }
              ]
            }
            
            # second rule is executed if the previous rule failed or threw an exception
            {
              commands : [
                { logDebug { format : "Marking event as malformed for downstream sink: {}", args: ["@{}"] } }
                { addValues { malformed : true } }
              ]
            }
            
          ]
        }
      }                                
    ]
  }
]

Also see http://kitesdk.org/docs/current/kite-morphlines/morphlinesReferenceGuide.html#tryRules

Wolfgang.

On Jan 3, 2014, at 2:03 AM, ed wrote:

> Thank you Brock, Devin and Jimmy for the great information.  Dumping null values in the the EventSerializer write method looks really easy to do but I think using the custom interceptor to validate then tag the event for proper good/bad routing sounds like a great idea and seems to fit into the Flume way of doing things better.
>
> Thank you again!
>
> ~Ed
>
>
> On Fri, Jan 3, 2014 at 2:40 AM, Devin Suiter RDX <[EMAIL PROTECTED]> wrote:
> Yes, the regex interceptors and selectors can be very powerful - experimenting with them was really exciting.
>
> Brock, thanks for validating the ML idea - as with most things, the simplest solution is probably the way to go, and in this use case, the morphlines might be overkill.
>
> Devin Suiter
> Jr. Data Solutions Software Engineer
>
> 100 Sandusky Street | 2nd Floor | Pittsburgh, PA 15212
> Google Voice: 412-256-8556 | www.rdx.com
>
>
> On Thu, Jan 2, 2014 at 12:27 PM, Brock Noland <[EMAIL PROTECTED]> wrote:
> Jimmy, great to hear that method is working for you!
>
> Devin, regarding the morphlines question. Since ML can have arbitrary java plugins it *can* do just about anything. I generally think of ML as the T in ETL. Doing the validation in ML might make sense. In general though I think adding the custom header field as probably the best option for dealing with bad data.
>
> Once again, thank you everyone for using our software!
>
>
> On Thu, Jan 2, 2014 at 10:10 AM, Jimmy <[EMAIL PROTECTED]> wrote:
> We are doing similar thing what Brock mentioned - simple interceptor for JSON validation with updating custom field in the header, then flume HDFS sink pushes the data to good/bad target directory based on this custom field.... then watch for bad directory in separate process.
>
> You could add notification to the flume flow, we wanted to keep it very simple.
>
>
>
>
> ---------- Forwarded message ----------
> From: Devin Suiter RDX <[EMAIL PROTECTED]>
> Date: Thu, Jan 2, 2014 at 7:40 AM
> Subject: Re: Handling malformed data when using custom AvroEventSerializer and HDFS Sink
> To: [EMAIL PROTECTED]
>
>
> Just throwing this out there, since I haven't had time to dig into the API with a big fork, but, can morphlines offer any assistance here?
>
> Some kind of an interceptor that would parse for malformed data, package the offending data and send it somewhere (email it, log it), and then project a valid "there was something wrong here" piece of data into the field then allow your channel to carry on? Or skip the projection piece and just move along? I was just thinking that the projection of known data into a field that previously had malformed data would allow you to easily locate those records later with the projected data, but keep your data shape consistent.
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB