Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - How to use gevent with python


Copy link to this message
-
Re: How to use gevent with python
David Montgomery 2013-11-10, 19:54
I am using kafka0. 7.x I should mention and is nit by choice because druid,
the app I am using only supports 0.7.x. Brod is failing with gevent with
0.7.2.  Brod totally does not work with gevent.

Thanks

On Mon, Nov 11, 2013 at 3:06 AM, Marc Labbe <[EMAIL PROTECTED]> wrote:

> Hi David,
>
> check for mahendra's fork of kafka-python, he has implemented gevent
> support in a branch (https://github.com/mahendra/kafka-python/tree/gevent)
> but
> it hasn't made it to the main repo (yet). I am not sure why and I haven't
> tested it myself.
>
> For brod, AFAIK, it doesn't support 0.8 so you have to use it with 0.7.x.
> On the other hand, kafka-python is the only python library I know of that
> supports 0.8.
>
> cheers,
> marc
>
>
> On Sat, Nov 9, 2013 at 6:34 PM, David Montgomery
> <[EMAIL PROTECTED]>wrote:
>
> > Hi,
> >
> > Even even further...using/kafka-python (
> > https://github.com/mumrah/kafka-python) I get the below error.  There is
> > nothing wrong with my kafka server.  I can telnet to 9092 just fine.
> >
> > Again..hoping for a python client that works with gevent.  kafka-python
> did
> > nok event make it past the connection.
> >
> > kafka = KafkaClient(kafka_domain, 9092)
> > No handlers could be found for logger "kafka"
> > Traceback (most recent call last):
> >   File "/home/ubuntu/workspace/rtbhui-devops/servers/worker_server.py",
> > line 133, in <module>
> >     kafka = KafkaClient(kafka_domain, 9092)
> >   File
> >
> >
> "/usr/local/lib/python2.7/dist-packages/kafka_python-0.8.1_1-py2.7.egg/kafka/client.py",
> > line 32, in __init__
> >     self._load_metadata_for_topics()
> >   File
> >
> >
> "/usr/local/lib/python2.7/dist-packages/kafka_python-0.8.1_1-py2.7.egg/kafka/client.py",
> > line 69, in _load_metadata_for_topics
> >     raise Exception("All servers failed to process request")
> > Exception: All servers failed to process request
> >
> >
> >
> > On Sun, Nov 10, 2013 at 5:47 AM, David Montgomery <
> > [EMAIL PROTECTED]
> > > wrote:
> >
> > > Hi,
> > >
> > > I asked this question on StackOverflow.  No response.  No solution
> seems
> > > tp work...even for a python lib that was designed for gevent and kafka,
> >  Is
> > > there any otehr libary that works with gevent wit python that works?
> > >
> > >
> > >
> > >
> > > am trying to use gevent to write to kafka using brod on python 2.7.
> > >
> > > Here is the error message I get. Guess its to do blocking. brod
> supports
> > > tornado but I use gevent.
> > >
> > > No handlers could be found for logger "brod.socket"Traceback (most
> > recent call last):
> > >   File "/var/chef/cache/src/gevent/gevent/greenlet.py", line 328, in
> run
> > >     result = self._run(*self.args, **self.kwargs)
> > >   File "worker_server.py", line 204, in execute_kafka_pipe
> > >     kafka.produce(topic,payload)
> > >   File
> >
> "/usr/local/lib/python2.7/dist-packages/brod-0.3.2-py2.7.egg/brod/base.py",
> > line 287, in produce
> > >     return self._write(request, callback)
> > >   File
> >
> "/usr/local/lib/python2.7/dist-packages/brod-0.3.2-py2.7.egg/brod/blocking.py",
> > line 98, in _write
> > >     return self._write(data, callback, retries)
> > >   File
> >
> "/usr/local/lib/python2.7/dist-packages/brod-0.3.2-py2.7.egg/brod/blocking.py",
> > line 89, in _write
> > >     wrote_length += self._socket.send(data)
> > >   File "/var/chef/cache/src/gevent/gevent/socket.py", line 441, in send
> > >     self._wait(self._write_event)
> > >   File "/var/chef/cache/src/gevent/gevent/socket.py", line 292, in
> _wait
> > >     assert watcher.callback is None, 'This socket is already used by
> > another greenlet: %r' % (watcher.callback, )AssertionError: This socket
> is
> > already used by another greenlet: <bound method Waiter.switch of
> > <gevent.hub.Waiter object at 0x1dece60>><Greenlet at 0x1d4b9b0:
> > execute_kafka_pipe('topic-spend', '{"enode": 1, "city": "Cairns", "dl":
> > "en", "wnode)> failed with AssertionError
> > >
> > > I tried to use gevent-kakfa but depends on gevent-zookeeper.