Я написал php-работника для извлечения данных из очереди RabbitMQ. Php-работник успешно работает в качестве фонового задания на каком-либо сервере, который подключается к серверу RabbitMQ для использования данных с использованием расширения php AMQP. После этого в очереди нет данныхв течение 15 минут php-скрипты генерируют исключение AMQPException.Исключение:
Объект AMQPException (
[message:protected] => Library error: a socket error occurred
[string:Exception:private] =>
[code:protected] => 0
[file:protected] => /home/indiamart/public_html/dev-weberp-auto-dialer/merp/devworker.php
[line:protected] => 104
[trace:Exception:private] => Array
(
[0] => Array
(
[file] => /home/indiamart/public_html/dev-weberp-auto-dialer/merp/devworker.php
[line] => 104
[function] => consume
[class] => AMQPQueue
[type] => ->
[args] => Array
(
[0] => Closure Object
(
[parameter] => Array
(
[$message] => <required>
[$q] => <required>
)
)
)
)
)
[previous:Exception:private] =>
)
Ниже мой рабочий код:
<code> <?php
$callback_func = function(AMQPEnvelope $message, AMQPQueue $q){
$data = json_decode($message->getBody(), true);
$getDeliveryTag = $message->getDeliveryTag();
$ack = $q->ack($getDeliveryTag);//used $getDeliveryTag
$to = isset($data["to"])?$data["to"]:"";
$subjectMail = isset($data["subject"])?$data["subject"]:"";
$mail_body_content = isset($data["body"])?"<pre>".$data["body"]."
":" "; $ mailfrom = isset ($ data ["mailfrom"])? $ data ["mailfrom"]: ""; $ cc = isset ($ data ["cc"])? $ data ["cc"]: ""; $ mailfromname = isset ($ data ["mailfromname"])? $ data ["mailfromname"]: ""; $ uniqueid = isset ($ data ["unique_id"])? $ data ["unique_id"]: ""; if (! $ ack) {// если ack не получен $ ack_msg = "Уникальный идентификатор: $ uniqueid Тема: $ subjectMail"; @mail ("abc@example.com", "Очередь истории не подтверждена!", $ ack_msg);} if ($ message-> isRedelivery ()) {$ red_msg = "Уникальный идентификатор: $ uniqueid Тема: $ subjectMail"; @mail ("abc@example.com", "Повторная отправка очереди истории!", $ red_msg);} $ m_headers_trail = "From: $ mailfromname <$ mailfrom> \ n". "Cc: $ cc \ n". "MIME-Version: 1.0 \ n". "Тип содержимого: text / html;charset = UTF-8 "; $ flag = @mail ($ to, $ subjectMail, $ mail_body_content, $ m_headers_trail); $ flag_message = $ flag?" уникальный идентификатор успеха: $ uniqueid Тема: $ subjectMail ":" почта не удалась ";@mail ("abc@example.com", "Результат отправителя истории", $ flag_message);}; $ host = "127.0.0.1"; $ vhost = "/"; $ port = 5672; $ login = "admin"; $ password = "admin"; CHANNEL: try {@mail ("abc@example.com", "Начало очереди истории!", "Работник запущен!"); $ cnn = new AMQPConnection (array ("host" => $ host, "vhost" => $ vhost, "port" => $ port, "login" => $ login, "password" => $ password)); $ cnn-> connect (); if (! $cnn) {@mail ("abc@example.com", "Ошибка подключения к истории!", "Соединение не установлено!");} $ ch = new AMQPChannel ($ cnn); $ queue = new AMQPQueue ($ ch); $ queue-> setName ('STS_UPDATE_MAIL'); $ queue-> setFlags (AMQP_NOPARAM); $ queue-> потребление ($ callback_func); $ ch-> close (); $ cnn-> close ();} catch (Исключение $ e) {if ($ ch-> isConnected ()) {$ ch-> close (); @mail ("abc@example.com", "Соединение закрытия истории!", "Закрыто!");}если ($ сnn-> isConnected ()) {$ cnn-> close ();@mail ("abc@example.com", "Соединение закрытия очереди истории!", "Закрыто!");} перейти к КАНАЛУ;}?>