Потребительский обратный вызов: какая очередь пуста? - PullRequest
0 голосов
/ 09 мая 2018

Я использую функцию MQCB для добавления функций обратного вызова потребителя сообщений для очередей, из которых я читаю. Я пытаюсь прочитать из двух очередей в одном и том же соединении, и, кажется, он работает нормально при получении сообщений: моя функция обратного вызова получает дескриптор объекта для очереди, из которой было получено сообщение.

Однако, когда я получаю событие MQRC_NO_MSG_AVAILABLE (поскольку я установил MQGMO_WAIT для моего потребителя), дескриптор объекта равен MQHO_NONE, поэтому я не могу сказать, к какой очереди относится событие. Я мог бы решить это, поместив дескриптор объекта в контекст обратного вызова, но так ли это должно быть? Или я что-то упускаю здесь очевидное?

Я подключаюсь к администратору очередей под управлением версии 8.0.0.2 в Linux, используя клиентскую библиотеку C версии 8.0.0.5, также в Linux. Вот вывод из моей программы-примера, показывающий, что дескрипторы объекта равны 0:

Opened queue 'AMQ.5A55ED982D616602                            ' with handle 101
Opened queue 'AMQ.5A55ED982D616603                            ' with handle 102
Completion code MQCC_FAILED, reason MQRC_NO_MSG_AVAILABLE, object handle 0
Completion code MQCC_FAILED, reason MQRC_NO_MSG_AVAILABLE, object handle 0

И сама программа:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <cmqc.h>
#include <cmqxc.h>
#include <cmqstrc.h>

void my_message_consumer(MQHCONN, PMQMD, PMQGMO, PMQVOID, PMQCBC);

volatile unsigned events_received = 0;

void
check_fail(const char *action, MQLONG comp_code, MQLONG reason)
{
  if (comp_code != MQCC_OK) {
    fprintf(stderr, "%s failed with %s %s\n",
            action, MQCC_STR(comp_code), MQRC_STR(reason));
    exit(1);
  }
}

int
main()
{
  MQHCONN hconn;
  MQHOBJ hobj1, hobj2;
  MQOD od = {MQOD_DEFAULT};
  char queue_name[MQ_Q_NAME_LENGTH + 1];
  MQLONG c, r;

  MQCONN("", &hconn, &c, &r);
  check_fail("MQCONN", c, r);

  /* Open two dynamic queues */
  strcpy(od.ObjectName, "SYSTEM.DEFAULT.MODEL.QUEUE");
  MQOPEN(hconn, &od, MQOO_INPUT_EXCLUSIVE, &hobj1, &c, &r);
  check_fail("MQOPEN", c, r);

  strncpy(queue_name, od.ObjectName, MQ_Q_NAME_LENGTH);
  queue_name[MQ_Q_NAME_LENGTH] = '\0';
  printf("Opened queue '%48s' with handle %d\n", queue_name, hobj1);

  strcpy(od.ObjectName, "SYSTEM.DEFAULT.MODEL.QUEUE");
  MQOPEN(hconn, &od, MQOO_INPUT_EXCLUSIVE, &hobj2, &c, &r);
  check_fail("MQOPEN", c, r);

  strncpy(queue_name, od.ObjectName, MQ_Q_NAME_LENGTH);
  queue_name[MQ_Q_NAME_LENGTH] = '\0';
  printf("Opened queue '%48s' with handle %d\n", queue_name, hobj2);

  /* Add a callback with zero WaitInterval for both queues */
  MQMD md = {MQMD_DEFAULT};
  MQGMO gmo = {MQGMO_DEFAULT};
  MQCBD cbd = {MQCBD_DEFAULT};
  gmo.Options = MQGMO_NO_SYNCPOINT | MQGMO_WAIT;
  gmo.WaitInterval = 0;
  cbd.CallbackType = MQCBT_MESSAGE_CONSUMER;
  cbd.CallbackFunction = &my_message_consumer;

  MQCB(hconn, MQOP_REGISTER, &cbd, hobj1, &md, &gmo, &c, &r);
  check_fail("MQCB", c, r);
  MQCB(hconn, MQOP_REGISTER, &cbd, hobj2, &md, &gmo, &c, &r);
  check_fail("MQCB", c, r);

  /* Start consuming */
  MQCTLO ctlo = {MQCTLO_DEFAULT};
  MQCTL(hconn, MQOP_START, &ctlo, &c, &r);
  check_fail("MQCTL start", c, r);

  /* Wait until events received */
  while (events_received < 2)
    sleep(1);

  return 0;
}

void
my_message_consumer(MQHCONN hconn, PMQMD md, PMQGMO gmo,
                    PMQVOID buffer, PMQCBC context)
{
  printf("Completion code %s, reason %s, object handle %d\n",
     MQCC_STR(context->CompCode), MQRC_STR(context->Reason),
     context->Hobj);
  events_received++;
}

Скомпилируйте его с помощью:

gcc -o mq-no-msg mq-no-msg.c -g -Wall -I/opt/mqm/inc -L/opt/mqm/lib64 -lmqic_r -Wl,-rpath=/opt/mqm/lib64

и установите переменную окружения MQSERVER перед запуском.

Ответы [ 2 ]

0 голосов
/ 12 мая 2018

Когда вы получили MQRC_NO_MSG_AVAILABLE в качестве события, это означало, что в любых очередях, которые вы зарегистрировали, не было сообщения (соответствующего вашим критериям, если вы указали какие-либо критерии).Так что в этом случае нет необходимости снабжать обратный вызов каким-либо конкретным HObj.

0 голосов
/ 09 мая 2018

Я нашел информацию из нескольких источников по этой теме, что, когда они собраны вместе, рисуют всю картину (к сожалению, MQ KC от IBM не очень хорошо описывает это, если не сказать больше).

  1. На технической конференции Capitalware MQ v2.0.1.3 Мораг Хьюсон выступил с докладом Расширенное программирование приложений WebSphere MQ V7 , в котором содержится некоторая полезная информация.

    На шестой странице говорится:

    • Ваш получатель сообщения также может быть вызван с CallType, установленным в MQCBCT_EVENT_CALL (это также единственный способ, которым обработчик событий может быть называется). Потребителю сообщения будут предоставлены соответствующие события в очередь, которую он потребляет, например, из MQRC_GET_INHIBITED тогда как обработчик событий получает события широкого подключения.
  2. На странице IBM MQ v8 KC MQCBC - Контекст обратного вызова> Поля для MQCBC> Hobj (MQHOBJ) он сообщает:

    Для обработчика событий это значение MQHO_NONE

  3. Предоставленный IBM образец amqscbf0.c также демонстрирует проверку pContext->CallType в MessageConsumer, если это тип MQCBCT_EVENT_CALL, он печатает причину, если это тип MQCBCT_MSG_REMOVED, он печатает сообщение.


На основании приведенной выше информации кажется, что поведение, которое вы видите, является ожидаемым.


Предложенный обходной путь - установить CallbackArea в поле каждой очереди MQCBD с уникальным значением, которое можно использовать для определения очереди, к которой относится событие. к.


На четвертой странице презентации Morag "Расширенное программирование приложений WebSphere MQ V7" говорится следующее:

  • MQGMO_WAIT с MQGMO.WaitInterval = 0 работает так же, как MQGMO_NO_WAIT, когда используется на MQGET, но в случае асинхронные потребители, мы хотим, чтобы потребитель не опрашивал в этом случае цикл занят, поэтому он работает скорее как маркер обратного останова показать, когда был достигнут конец пакета сообщений.

В таблице на той же странице под столбцом «Асинхронное потребление» для MQGMO_WAIT with MQGMO.WaitInterval = 0 указано:

Вызывается только с MQRC_NO_MSGS_AVAILABLE, если только что начался или имел сообщение с последнего 2033

Ваш потребитель не будет непрерывно получать события, уведомляющие его об отсутствии сообщений в очереди. Событие генерируется только в том случае, если нет сообщений при первом запуске обратного вызова и / или после каждого прочтения ВСЕХ сообщений (GET) из очереди. По сути, это позволяет вам знать, что в настоящее время больше нет доступных сообщений после того, как он прочитал хотя бы одно сообщение. Это может быть полезно, если вы ожидаете пакеты сообщений и хотите выполнить какое-либо действие после того, как все сообщения в пакете будут считаны из очереди.

  • Обратите внимание, что MQGMO_NO_WAIT и MQGMO_WAIT с параметром WaitInterval, равным MQWI_UNLIMITED сильно отличается при передаче в MQGET, но с MQCB называют их поведение одинаковым. Потребитель будет только пропущенные сообщения и события, никогда не будет передан код причины указав нет сообщений. Фактически MQGMO_NO_WAIT будет рассматриваться как неопределенное ожидание. Это должно предотвратить бесконечный потребитель вызывается с кодом причины отсутствия сообщений.

Если вам действительно не нужны сообщения о событиях MQRC_NO_MSG_AVAILABLE, то, вероятно, MQGMO_NO_WAIT - это путь.

...