Я сделал это, как показано ниже
Example using PIKA consumer without blocking thread - PIKA and GRPC Streaming
###########
def grpc_test(self, request, context):
# A response streaming GRPC implementation - Client gets stream of messages
message_queue = Queue.Queue()
app = request
def rmq_callback(data):
print("Got a call back from RMQ Client")
message_queue.put(data)
# Register with RabbitMQ for Data
# thread safe - create a connection here and a channel
pikaconsumer = TestConsumer()
# Client want to listen on this queue
pikaconsumer.listen_on_queue("xxxx", rmq_callback)
# use the connection and channel in a new thread (and no other thread)
t= threading.Thread(target=pikaconsumer.start_consuming)
t.start()
while True:
date = message_queue.get(True)
protobuf_obj = proto.Data()
message.ParseFromString(obj)
yield message
###########
class TestConsumer(object):
def __init__(self):
amqp_url ='amqp://guest:guest@127.0.0.1:5672/'
parameters = pika.URLParameters(amqp_url)
connection = pika.BlockingConnection(parameters)
self._channel = connection.channel()
def listen_on_queue(self,queue_name,exchange,routing_keys,_callback):
# In case queue is not there - create a queue
self._channel.queue_declare(queue=queue_name,auto_delete=True,)
for routing_key in routing_keys:
self._channel.queue_bind(queue_name,
exchange, str(routing_key))
LOGGER.info('Binding Exchange[%s] to Queue[%s] with RoutingKey[%s]',
exchange, queue_name, str(routing_key))
def __on_message(channel, method_frame, header_frame, body, callback=()):
print(method_frame.delivery_tag)
callback(body)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
self._consumer_tag = self._channel.basic_consume(partial(__on_message,
callback=_callback), queue_name)
def start_consuming(self):
self._channel.start_consuming()