Что такое действительный @MessagePattern для микросервиса NestJS MQTT? - PullRequest
0 голосов
/ 24 октября 2018

Я пытаюсь настроить MQTT Microservice с использованием NestJS в соответствии с документами .

Я запустил работающий брокер Mosquitto с помощью Docker и проверил его работоспособность с использованием различных клиентов MQTT.Теперь, когда я запускаю службу NestJS, она, кажется, подключается правильно (mqqt.fx показывает нового клиента), но я не могу получить никаких сообщений в моих контроллерах.Это моя начальная загрузка, как в документах:

main.ts

async function bootstrap() {
    const app = await NestFactory.createMicroservice(AppModule, {
        transport: Transport.MQTT,
        options: {
            host: 'localhost',
            port: 1883,
            protocol: 'tcp'
        }
    });
    app.listen(() => console.log('Microservice is listening'));
}
bootstrap();

app.controller.ts

@Controller()
export class AppController {

    @MessagePattern('mytopic') // tried {cmd:'mytopic'} or {topic:'mytopic'}
    root(msg: Buffer) {
        console.log('received: ', msg)
    }
}

Я неправильно использую декоратор шаблонов сообщений или моя концепция ошибочна относительно того, что даже должен делать микросервис NestJS MQTT?Я думал, что это может подписаться на тему, которую я передаю декоратору.Мой единственный другой источник информации - соответствующие модульные тесты

Ответы [ 3 ]

0 голосов
/ 21 января 2019

nest.js Pattern Handler

Со стороны nest.js у нас есть следующий обработчик шаблонов:

@MessagePattern('sum')
sum(data: number[]): number {
  return data.reduce((a, b) => a + b, 0);
}

Как объяснил @ Alexandre , он действительно будет слушатьна sum_ack.


Клиент не-nest.js

Клиент не-nest.js может выглядеть следующим образом (просто сохраните как client.js, запустите npm install mqtt изапустите программу с node client.js):

var mqtt = require('mqtt')
var client  = mqtt.connect('mqtt://localhost:1883')

client.on('connect', function () {
  client.subscribe('sum_res', function (err) {
    if (!err) {
      client.publish('sum_ack', '{"data": [2, 3]}');
    }
  })
})

client.on('message', function (topic, message) {
  console.log(message.toString())
  client.end()
})

Отправляет сообщение по теме sum_ack и прослушивает сообщения на sum_res.Когда он получает сообщение на sum_res, он регистрирует сообщение и завершает программу.nest.js ожидает, что формат сообщения будет {data: myData}, а затем вызовет обработчик параметра sum(myData).

// Log:
{"err":null,"response":5} // This is the response from sum()
{"isDisposed":true} // Internal "complete event" (according to unit test)

Конечно, это не очень удобно ...


Клиент nest.js

Это потому, что он предназначен для использования с другим клиентом nest.js, а не с обычным клиентом mqtt.Клиент nest.js абстрагирует всю внутреннюю логику.См. этот ответ , в котором описывается клиент для redis (для mqtt нужно изменить только две строки).

async onModuleInit() {
  await this.client.connect();
  // no 'sum_ack' or {data: [0, 2, 3]} needed
  this.client.send('sum', [0, 2, 3]).toPromise();
}
0 голосов
/ 22 июля 2019

Я боролся с MQTT сегодня, и это немного помогло мне, но у меня было больше проблем, и ниже вы можете увидеть мои выводы:

Неправильный способ URL-адреса брокера конфигурации

В моем случаекогда я использовал нелокальный сервер MQTT, я начинал с этого:

  const app = await NestFactory.createMicroservice(AppModule, {
    transport: Transport.MQTT,
    options: {
      host: 'test.mosquitto.org',
      port: 1883,
      protocol: 'tcp',
    },
  });
  await app.listenAsync();

, но, как вы можете прочитать в конструкторе ServerMqtt , они используют только опцию url (если ее не предоставили)откат к 'mqtt://localhost:1883'. Хотя у меня нет локального MQTT, он никогда не разрешит app.listenAsync(), который разрешен только на connect , и также не будет запускать какой-либо обработчик.

Он начал работать, когда я настроил код для использования опции url.

  const app = await NestFactory.createMicroservice(AppModule, {
    transport: Transport.MQTT,
    options: {
      url: 'mqtt://test.mosquitto.org:1883',
    },
  });
  await app.listenAsync();

Для сообщений требуется id свойство

Вторая очень странная проблема заключалась в том, что при использовании Сценарий клиента не-nest.js от @KimKern Мне пришлось зарегистрировать два MessagePatterns: sum и sum_ack:

  @MessagePattern('sum')
  sum(data: number[]): number {
    return data.reduce((a, b) => a + b, 0);
  }

  @MessagePattern('sum_ack')
  sumAck(data: number[]): number {
    return data.reduce((a, b) => a + b, 0);
  }

Когда я использовал console.log, я обнаружил, что последнийбежать, но только когда присутствует первый. Вы можете подтолкнуть меня жеssage для брокера с помощью mqtt cli tool, чтобы проверить его:

mqtt pub -t 'sum_ack' -h 'test.mosquitto.org' -m '{"data":[1,2]}'

Но самая большая проблема заключалась в том, что он не ответил (опубликовать sum_res) .

Решением было также предоставить id при отправке сообщения.

mqtt pub -t 'sum_ack' -h 'test.mosquitto.org' -m '{"data":[1,2], "id":"any-id"}'

Тогда мы могли бы удалить MessagePattern 'sum_ack' и оставить только этот код:

  @MessagePattern('sum')
  sum(data: number[]): number {
    return data.reduce((a, b) => a + b, 0);
  }

Причина этого была скрыта внутри handleMessage метода ServerMqtt, который не будет публиковать ответ от обработчика, если сообщение не имело id.

TL / DR Укажите URL для брокера сообщений, используя только опцию url, и всегда предоставляйте id для сообщения.

Надеюсь, это сэкономит времядругим.

Счастливого взлома!

0 голосов
/ 20 января 2019

Документация не очень понятна, но кажется, что для mqtt, если у вас есть @MessagePattern('mytopic'), вы можете опубликовать команду по теме mytopic_ack, и вы получите ответ на mytopic_res.Я все еще пытаюсь выяснить, как публиковать в брокере mqtt из службы.

См. https://github.com/nestjs/nest/blob/e019afa472c432ffe9e7330dc786539221652412/packages/microservices/server/server-mqtt.ts#L99

  public getAckQueueName(pattern: string): string {
    return `${pattern}_ack`;
  }

  public getResQueueName(pattern: string): string {
    return `${pattern}_res`;
  }
...