Почему ActiveMQ доставляет дубликаты сообщений моему потребителю PHP через Stomp? - PullRequest
0 голосов
/ 09 июня 2018

Я не уверен, относится ли этот вопрос к stomp-php или ActiveMQ Docker (работает по умолчанию).

У меня есть простой вспомогательный класс Queueнаписанный на PHP, который обрабатывает как отправку сообщения в очередь (Queue::push), так и его использование (Queue::fetch).См. Код ниже.

Как видите, fetch() должен подписаться на очередь, прочитать одно сообщение и отписаться.Сообщение должно быть подтверждено автоматически (\Stomp\StatefulStomp::subscribe(), 3-й аргумент).

По какой-то причине около 5-7% сообщений принимаются заказчиком дважды или даже трижды. Почему сообщения доставляются несколько раз и как этого избежать?


Publisher (отправка 1000 сообщений):

$mq = new Queue('tcp://activemq:61613','test');
for ($msgCount = 0; $msgCount < 1000; $msgCount++) {
    $mq->push('Message #' . $msgCount);
}

Потребитель (получает ~ 1070 сообщений):

$mq = new Queue('tcp://activemq:61613','test');
$received = 0;
while (true) {
    $message = $mq->fetch();
    if (null === $message) { break; }
    $received++;
}

Код класса очереди:

use Stomp\Client;
use Stomp\Network\Connection;
use Stomp\SimpleStomp;
use Stomp\StatefulStomp;
use Stomp\Transport\Message;

class Queue
{
    /**
     * @var \Stomp\StatefulStomp
     */
    private $stomp;

    private $queue;

    public function __construct($uri, $queue) {
        $connection = new Connection('tcp://activemq:61613');
        $this->stomp = new StatefulStomp(new Client($connection));
        $connection->setReadTimeout(1);
        $this->queue = $queue;
    }

    public function push($body) {
        $message = new Message($body, ['activemq.maximumRedeliveries' => 0]);
        $this->stomp->send('/queue/' . $this->queue, $message);
    }
    public function fetch() {
        $subscriptionId = $this->stomp->subscribe('/queue/' . $this->queue, null, 'auto', ['activemq.prefetchSize' => 1]);
        $msg = $this->stomp->read();
        $this->stomp->unsubscribe($subscriptionId);
        return $msg;
    }
}
...