Как вы отвечаете клиенту RabbitMQ RP C с несколькими сообщениями? - PullRequest
0 голосов
/ 28 февраля 2020

Я пытаюсь использовать RabbitMQ в среде RP C, где каждый удаленный вызов будет занимать значительное время, постоянно принося результаты. Я хочу, чтобы результаты доставлялись клиенту по мере их генерации.

Я начал со стандартного учебного примера RP C, затем изменил его, чтобы использовать «Прямой ответ». Я публикую sh все промежуточные результаты обратно в «анонимную исключительную очередь обратного вызова» без подтверждения исходного запроса. Когда обработка завершена, я отправляю окончательное сообщение клиенту и затем подтверждаю исходный запрос. Но клиент видит только первое промежуточное сообщение. Мой клиент находится в PHP, а мой сервер в Python, но я подозреваю, что это не имеет значения. У кого-нибудь есть волхвы c, чтобы заставить это работать? Я могу опубликовать код, но это довольно простой материал из поваренной книги.

1 Ответ

0 голосов
/ 28 февраля 2020

Отвечая на мой собственный вопрос. Работало следующее:

php клиент:

#!/usr/bin/php
<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RpcClient {
    private $connection;
    private $channel;
    private $callback_queue;
    private $response;
    private $corr_id;

    public function __construct() {
        $this->connection = new AMQPStreamConnection(
            'localhost', 5672, 'guest', 'guest'
        );
        $this->channel = $this->connection->channel();

        list($this->callback_queue, ,) = $this->channel->queue_declare(
            "", false, false, true, false
        );


        # For direct reply-to, need to consume amq.rabbitmq.repy-to, a special queue name
        # Unclear what happens to the declare above
        $this->channel->basic_consume(
                $this->callback_queue, '', false, true,
                false, false, array($this, 'onResponse')
        );
    }

    # This is going to be called once for each message coming back
    public function onResponse($rep) {
        if ($rep->get('correlation_id') == $this->corr_id) {
                $response = json_decode($rep->body, true);
                echo print_r($response['line'], true);
                if ($response['type'] == 'final') {
                        $this->response = $rep->body;
                }
        }
    }

    public function call($message_array) {
        $this->response = null;
        $this->corr_id = uniqid();

        $jsonm = json_encode($message_array);
        $msg = new AMQPMessage(
            $jsonm,
            array(
                'correlation_id' => $this->corr_id,
                ### Not sure which of the next two lines is the correct one... if either....
                ##'reply_to' => 'amq.rabbitmq.reply-to' # This is when using direct reply-to
                'reply_to' => $this->callback_queue
            )
        );
        $this->channel->basic_publish($msg, '', 'ansiblePB_rpc_queue');
        while (!$this->response) {
            $this->channel->wait();
        }
        return intval($this->response);
    }
}

$ansiblepb_rpc = new RpcClient();
$response = $ansiblepb_rpc->call(array('userID' => 'jb1234', 
                                       'user_display_name' => 'Joe Bloe',
                                       'limit' => '24000'));
echo ' [.] Got ', $response, "\n";
?>

Python сервер:

#!/usr/bin/env python
""" 1 """
import glob
import json
import platform
import os
import re
import shutil
import subprocess
import time
import yaml

import pika

class RMQmultireply(object):
    """ Generic class to support ansible_playbook on a Rabbit MQ RPC queue"""
    def __init__(self, channel, method, props):
        #""" Constructor.... duh """
        self.channel = channel
        self.method = method
        self.props = props

    def run(self, userID, username, limit):
        """ Run the main guts of the service """

        cmd = ['/home/dhutchin/devel/rmq/multilineoutput']

        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

        for line in proc.stdout.readlines():
            intermediate_json_result = json.dumps({'type': 'intermediate', 'line': line})

            self.channel.basic_publish(exchange='',
                                       routing_key=self.props.reply_to,
                                       properties=pika.BasicProperties(
                                           correlation_id=self.props.correlation_id),
                                       body=str(intermediate_json_result))
            #self.channel.basic_ack(delivery_tag=self.method.delivery_tag)

        proc.wait()
        return proc.returncode


def on_request(channel, method, props, jsonstring):
    """ Request has just come in to run ansible_playbook """


    playbook = RMQmultireply(channel, method, props)

    # fork and exec a playbook
    #  Recieve each line of output and send them as received back
    #  to the requestor.
    #  .run does not return until playbook exits.
    # Use "Direct Reply-to" mechanism to return multiple messages to
    # our client.
    request = yaml.load(jsonstring)  # Yes, yaml works better than JSON
    returncode = playbook.run(request['userID'], request['user_display_name'], request['limit'])

    final_json_result = json.dumps({'type': "final", 'line': '', 'rc': returncode})

    channel.basic_publish(exchange='',
                          routing_key=props.reply_to,
                          properties=pika.BasicProperties(correlation_id=
                                                          props.correlation_id),
                          body=str(final_json_result))

    # Acknowlege the original message so that RabbitMQ can remove it
    # from the ansiblePB_rpc_queue queue
    channel.basic_ack(delivery_tag=method.delivery_tag)


def main():
    """ Its kinda obvious what this does """

    try:
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))
    except Exception:
        print "pika.BlockingConnection.... failed... maybe RabbitMQ is not running"
        quit()

    channel = connection.channel()

    channel.queue_declare(queue='ansiblePB_rpc_queue')

    channel.basic_qos(prefetch_count=1)
    # auto_ack is turned off by default, so we don't need to specify auto_ack=False
    channel.basic_consume(queue='ansiblePB_rpc_queue', on_message_callback=on_request)

    print " [x] Awaiting RPC requests"
    channel.start_consuming()


if __name__ == '__main__':
    main()
...