Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - java.io.IOException: Broken pipe


Copy link to this message
-
Re: java.io.IOException: Broken pipe
Yonghui Zhao 2013-03-20, 08:36
Thanks Neha,

After enable INFO log in consumer,
I find 2 exceptions in consumer side, any idea?

2013/03/20 14:52:00.585 INFO [SimpleConsumer] [] Reconnect in multifetch
due to socket error:
java.nio.channels.ClosedChannelException
    at
sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:120)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:163)
    at kafka.utils.Utils$.read(Utils.scala:538)
    at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
    at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
    at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
    at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
    at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
2013/03/20 14:52:03.678 INFO [SimpleConsumer] [] Reconnect in multifetch
due to socket error:
java.nio.channels.ClosedByInterruptException
    at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270)
    at kafka.utils.Utils$.read(Utils.scala:538)
    at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
    at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
    at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
    at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
    at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)

2013/3/19 Neha Narkhede <[EMAIL PROTECTED]>

> Modify the log4j properties for senseidb and set kafka.consumer to INFO.
> you can check the senseidb startup scripts on how they configure their
> log4j.
>
> Thanks,
> Neha
>
> On Tuesday, March 19, 2013, Yonghui Zhao wrote:
>
> > Hi Neha,
> >
> > How can I enable all kafka consumer log in senseidb?
> > Btw: I am using kafka 0.7.2 java client.
> >
> > 2013/3/19 Neha Narkhede <[EMAIL PROTECTED] <javascript:;>>
> >
> > > The logs show that senseidb is prematurely closing the socket
> connection
> > to
> > > the Kafka broker. I would enable atleast INFLO logging for Kafka in
> > > Senseidb to see what the issue is.
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Monday, March 18, 2013, Yonghui Zhao wrote:
> > >
> > > > Thanks Neha,
> > > >
> > > > I use one kafka server with 4 partitions and 3 consumers(senseidb).
> > > >
> > > > Kafka server producer input rate is about 10k.
> > > > And each consumer consuming rate is about 3k.
> > > >
> > > > I see this exceptions many times, kafka has this exception on each
> > > > consumers, but I didn't find error log in consumer side,
> > > consumer(senseidb)
> > > > is alive all the time.
> > > > Is it possible the exception is related with high input/output rate?
> > > >
> > > > And some times another exception(*Connection reset by peer*)
> happened.
> > > >
> > > > [2013-03-18 21:18:29,107] ERROR Closing socket for
> /10.2.201.203because
> > > > of error (kafka.network.Processor)
> > > > java.io.IOException: Connection reset by peer
> > > >     at sun.nio.ch.FileChannelImpl.**transferTo0(Native Method)
> > > >     at sun.nio.ch.FileChannelImpl.**transferToDirectly(**
> > > > FileChannelImpl.java:456)
> > > >     at
> > > sun.nio.ch.FileChannelImpl.**transferTo(FileChannelImpl.**java:557)
> > > >     at
> > kafka.message.FileMessageSet.**writeTo(FileMessageSet.scala:**102)
> > > >     at
> kafka.server.MessageSetSend.**writeTo(MessageSetSend.scala:**53)
> > > >     at kafka.network.MultiSend.**writeTo(Transmission.scala:91)
> > > >     at kafka.network.Processor.write(**SocketServer.scala:339)