Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - How to use gevent with python


Copy link to this message
-
Re: How to use gevent with python
Kane Kane 2013-11-10, 19:26
How it's possible to have async consumer?

On Sun, Nov 10, 2013 at 11:06 AM, Marc Labbe <[EMAIL PROTECTED]> wrote:
> Hi David,
>
> check for mahendra's fork of kafka-python, he has implemented gevent
> support in a branch (https://github.com/mahendra/kafka-python/tree/gevent) but
> it hasn't made it to the main repo (yet). I am not sure why and I haven't
> tested it myself.
>
> For brod, AFAIK, it doesn't support 0.8 so you have to use it with 0.7.x.
> On the other hand, kafka-python is the only python library I know of that
> supports 0.8.
>
> cheers,
> marc
>
>
> On Sat, Nov 9, 2013 at 6:34 PM, David Montgomery
> <[EMAIL PROTECTED]>wrote:
>
>> Hi,
>>
>> Even even further...using/kafka-python (
>> https://github.com/mumrah/kafka-python) I get the below error.  There is
>> nothing wrong with my kafka server.  I can telnet to 9092 just fine.
>>
>> Again..hoping for a python client that works with gevent.  kafka-python did
>> nok event make it past the connection.
>>
>> kafka = KafkaClient(kafka_domain, 9092)
>> No handlers could be found for logger "kafka"
>> Traceback (most recent call last):
>>   File "/home/ubuntu/workspace/rtbhui-devops/servers/worker_server.py",
>> line 133, in <module>
>>     kafka = KafkaClient(kafka_domain, 9092)
>>   File
>>
>> "/usr/local/lib/python2.7/dist-packages/kafka_python-0.8.1_1-py2.7.egg/kafka/client.py",
>> line 32, in __init__
>>     self._load_metadata_for_topics()
>>   File
>>
>> "/usr/local/lib/python2.7/dist-packages/kafka_python-0.8.1_1-py2.7.egg/kafka/client.py",
>> line 69, in _load_metadata_for_topics
>>     raise Exception("All servers failed to process request")
>> Exception: All servers failed to process request
>>
>>
>>
>> On Sun, Nov 10, 2013 at 5:47 AM, David Montgomery <
>> [EMAIL PROTECTED]
>> > wrote:
>>
>> > Hi,
>> >
>> > I asked this question on StackOverflow.  No response.  No solution seems
>> > tp work...even for a python lib that was designed for gevent and kafka,
>>  Is
>> > there any otehr libary that works with gevent wit python that works?
>> >
>> >
>> >
>> >
>> > am trying to use gevent to write to kafka using brod on python 2.7.
>> >
>> > Here is the error message I get. Guess its to do blocking. brod supports
>> > tornado but I use gevent.
>> >
>> > No handlers could be found for logger "brod.socket"Traceback (most
>> recent call last):
>> >   File "/var/chef/cache/src/gevent/gevent/greenlet.py", line 328, in run
>> >     result = self._run(*self.args, **self.kwargs)
>> >   File "worker_server.py", line 204, in execute_kafka_pipe
>> >     kafka.produce(topic,payload)
>> >   File
>> "/usr/local/lib/python2.7/dist-packages/brod-0.3.2-py2.7.egg/brod/base.py",
>> line 287, in produce
>> >     return self._write(request, callback)
>> >   File
>> "/usr/local/lib/python2.7/dist-packages/brod-0.3.2-py2.7.egg/brod/blocking.py",
>> line 98, in _write
>> >     return self._write(data, callback, retries)
>> >   File
>> "/usr/local/lib/python2.7/dist-packages/brod-0.3.2-py2.7.egg/brod/blocking.py",
>> line 89, in _write
>> >     wrote_length += self._socket.send(data)
>> >   File "/var/chef/cache/src/gevent/gevent/socket.py", line 441, in send
>> >     self._wait(self._write_event)
>> >   File "/var/chef/cache/src/gevent/gevent/socket.py", line 292, in _wait
>> >     assert watcher.callback is None, 'This socket is already used by
>> another greenlet: %r' % (watcher.callback, )AssertionError: This socket is
>> already used by another greenlet: <bound method Waiter.switch of
>> <gevent.hub.Waiter object at 0x1dece60>><Greenlet at 0x1d4b9b0:
>> execute_kafka_pipe('topic-spend', '{"enode": 1, "city": "Cairns", "dl":
>> "en", "wnode)> failed with AssertionError
>> >
>> > I tried to use gevent-kakfa but depends on gevent-zookeeper.
>> >
>> > When trying to connect to zookeeper i get this message:
>> >
>> > Traceback (most recent call last):
>> >   File "/home/ubuntu/workspace/devops/servers/worker_server.py", line
>> 68, in <module>
>> >     framework = gevent_zookeeper.ZookeeperFramework('localhost:2181', 10)