Отвечая на мой собственный вопрос. Работало следующее:
php клиент:
#!/usr/bin/php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RpcClient {
private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;
public function __construct() {
$this->connection = new AMQPStreamConnection(
'localhost', 5672, 'guest', 'guest'
);
$this->channel = $this->connection->channel();
list($this->callback_queue, ,) = $this->channel->queue_declare(
"", false, false, true, false
);
# For direct reply-to, need to consume amq.rabbitmq.repy-to, a special queue name
# Unclear what happens to the declare above
$this->channel->basic_consume(
$this->callback_queue, '', false, true,
false, false, array($this, 'onResponse')
);
}
# This is going to be called once for each message coming back
public function onResponse($rep) {
if ($rep->get('correlation_id') == $this->corr_id) {
$response = json_decode($rep->body, true);
echo print_r($response['line'], true);
if ($response['type'] == 'final') {
$this->response = $rep->body;
}
}
}
public function call($message_array) {
$this->response = null;
$this->corr_id = uniqid();
$jsonm = json_encode($message_array);
$msg = new AMQPMessage(
$jsonm,
array(
'correlation_id' => $this->corr_id,
### Not sure which of the next two lines is the correct one... if either....
##'reply_to' => 'amq.rabbitmq.reply-to' # This is when using direct reply-to
'reply_to' => $this->callback_queue
)
);
$this->channel->basic_publish($msg, '', 'ansiblePB_rpc_queue');
while (!$this->response) {
$this->channel->wait();
}
return intval($this->response);
}
}
$ansiblepb_rpc = new RpcClient();
$response = $ansiblepb_rpc->call(array('userID' => 'jb1234',
'user_display_name' => 'Joe Bloe',
'limit' => '24000'));
echo ' [.] Got ', $response, "\n";
?>
Python сервер:
#!/usr/bin/env python
""" 1 """
import glob
import json
import platform
import os
import re
import shutil
import subprocess
import time
import yaml
import pika
class RMQmultireply(object):
""" Generic class to support ansible_playbook on a Rabbit MQ RPC queue"""
def __init__(self, channel, method, props):
#""" Constructor.... duh """
self.channel = channel
self.method = method
self.props = props
def run(self, userID, username, limit):
""" Run the main guts of the service """
cmd = ['/home/dhutchin/devel/rmq/multilineoutput']
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
for line in proc.stdout.readlines():
intermediate_json_result = json.dumps({'type': 'intermediate', 'line': line})
self.channel.basic_publish(exchange='',
routing_key=self.props.reply_to,
properties=pika.BasicProperties(
correlation_id=self.props.correlation_id),
body=str(intermediate_json_result))
#self.channel.basic_ack(delivery_tag=self.method.delivery_tag)
proc.wait()
return proc.returncode
def on_request(channel, method, props, jsonstring):
""" Request has just come in to run ansible_playbook """
playbook = RMQmultireply(channel, method, props)
# fork and exec a playbook
# Recieve each line of output and send them as received back
# to the requestor.
# .run does not return until playbook exits.
# Use "Direct Reply-to" mechanism to return multiple messages to
# our client.
request = yaml.load(jsonstring) # Yes, yaml works better than JSON
returncode = playbook.run(request['userID'], request['user_display_name'], request['limit'])
final_json_result = json.dumps({'type': "final", 'line': '', 'rc': returncode})
channel.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=
props.correlation_id),
body=str(final_json_result))
# Acknowlege the original message so that RabbitMQ can remove it
# from the ansiblePB_rpc_queue queue
channel.basic_ack(delivery_tag=method.delivery_tag)
def main():
""" Its kinda obvious what this does """
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
except Exception:
print "pika.BlockingConnection.... failed... maybe RabbitMQ is not running"
quit()
channel = connection.channel()
channel.queue_declare(queue='ansiblePB_rpc_queue')
channel.basic_qos(prefetch_count=1)
# auto_ack is turned off by default, so we don't need to specify auto_ack=False
channel.basic_consume(queue='ansiblePB_rpc_queue', on_message_callback=on_request)
print " [x] Awaiting RPC requests"
channel.start_consuming()
if __name__ == '__main__':
main()