Не получено сообщение Azure-iothub, отправленное из функции Azure-Event-Hubs onMessage - PullRequest
0 голосов
/ 31 мая 2018

У меня есть веб-API, который может получать и отправлять сообщения Raspberry Pi.Соединение работает нормально, используя Azure-Event-Hubs в веб-API для получения сообщений и Azure-Iothub для отправки сообщений в Raspberry.

Проблема, с которой я сталкиваюсь, это когда я пытаюсь отправить сообщение вфункция onMessage (поэтому всякий раз, когда я получаю сообщение в webapi), устройство не получает его.Вот мой код:

WebApi:

const { EventHubClient, EventPosition } = require('azure-event-hubs');
var connectionString = 'myConnectionString'
var sendingClient = require('../azure/sendingClient')

async function main() {
    sendingClient.sendMessage('raspberry',{},"allDevices")  //The raspberry receives this  
    const client = await 
    EventHubClient.createFromIotHubConnectionString(connectionString);

    const onError = (err) => {
        console.log("An error occurred on the receiver ", err);
    };

    const onMessage = (msg) => {
        console.log(msg.body);
        sendingClient.sendMessage('raspberry',{},"allDevices")// the raspberry doesn't receive this
    };

    const receiveHandler = client.receive("1", onMessage, onError, { 
        eventPosition: EventPosition.fromEnqueuedTime(Date.now()) 
    });

  // To stop receiving events later on...
  await receiveHandler.stop();
  await client.close();
}

main().catch((err) => {
    console.log(err);
});

Отправляющий клиент:

var Client = require('azure-iothub').Client;
var Message = require('azure-iot-common').Message;

var connectionString = 'myConnectionString'

var sendingClient = Client.fromConnectionString(connectionString);

exports.sendMessage = (targetDevice, content, messageId) => {
    sendingClient.open(function (err) {
        if (err) {
            console.error('Could not connect: ' + err.message);
        } else {
            console.log('Service client connected');
            var message = new Message(content);
            message.ack = 'full';
            message.messageId = 'message'
            message.properties.add('message',messageId)
            console.log('Sending message: ' + message.getData());
            console.log('Sending message to : ' + targetDevice);            
            sendingClient.send(targetDevice, message,);
        }
    });
}

Получатель на raspberryPi:

var iothub = require('azure-iothub');
var Protocol = require('azure-iot-device-mqtt').Mqtt;
var Client = require('azure-iot-device').Client;
var Message = require('azure-iot-device').Message;
var client = Client.fromConnectionString(connectionString, Protocol)
client.open((err) => {
    if (err) console.error('Could not connect: ' + err.message)
    else {
        client.on('message', (msg) => {
            switch (msg.properties.propertyList[1].value) {
                case 'allDevices':
                    devices = JSON.parse(msg.data.toString())
                    response(devices) //passing the message content
            }
        });

        client.on('error', (err) => {
            console.error(err.message);
        });

        client.on('disconnect', () => {
            clearInterval(sendInterval);
            client.removeAllListeners();
            client.open(connectCallback);
        });
    }
})

отправитель включенМалинаПи:

exports.sendMessage = (data, connectionString, key) => {
    var client = Client.fromConnectionString(connectionString, Protocol)
    client.open((err) => {
        if (err) console.error('Send message error: ' + err.message)
        else {
            data = JSON.stringify(data);
            var message = new Message(data);
            message.properties.add('message', key);
            client.sendEvent(message);
            console.log('Message sent ' + key);
        }
    })
}

1 Ответ

0 голосов
/ 01 июня 2018

Я думаю, это связано с тем, что клиент-концентратор событий не может получать сообщения из раздела «1».Невозможно отправить сообщение определенному разделу в IoT Hub. Разделы используются внутри, чтобы обеспечить масштабирование IoT (Event Hub) и позволить масштабировать приложение-потребитель.Вы можете попробовать следующий код для получения сообщений со всех разделов.

  const partitionIds = await client.getPartitionIds();
  partitionIds.forEach(function(id,index){
    const receiveHandler = client.receive(id, onMessage, onError, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });  
  });

Обновление:

WebApi.js

const { EventHubClient, EventPosition } = require('azure-event-hubs');
var connectionString = '{iot-hub-connectionstring}';
var sendingClient = require('./sendingClient');

async function main() {
    sendingClient.sendMessage('raspberry',{},"raspberry-first")  //The raspberry receives this  
    const client = await EventHubClient.createFromIotHubConnectionString(connectionString);

    const onError = (err) => {
        console.log("An error occurred on the receiver ", err);
    };

    const onMessage = (msg) => {
        console.log("receive response");
        console.log(msg.body);
        sendingClient.sendMessage('raspberry',{},"raspberry-second")// the raspberry doesn't receive this
    };

    const partitionIds = await client.getPartitionIds();
    partitionIds.forEach(function(id,index){
        const receiveHandler = client.receive(id, onMessage, onError, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });  
    });  
}

main().catch((err) => {
    console.log(err);
});

sendClient.js

var Client = require('azure-iothub').Client;
var Message = require('azure-iot-common').Message;

var connectionString = '{iot-hub-connectionstring}';

var sendingClient = Client.fromConnectionString(connectionString);

exports.sendMessage = (targetDevice, content, messageId) => {
    sendingClient.open(function (err) {
        if (err) {
            console.error('Could not connect: ' + err.message);
        } else {
            console.log('Service client connected');
            var message = new Message(content);
            message.ack = 'full';
            message.messageId = 'message'
            message.properties.add('message',messageId)
            console.log('Sending message: ' + message.getData());
            console.log('Sending message to : ' + targetDevice);            
            sendingClient.send(targetDevice, message,);
        }
    });
}

Receiver.js varProtocol = require ('azure-iot-device-mqtt'). Mqtt;var Client = require ('azure-iot-device'). Client;var Message = require ('azure-iot-device'). Message;

var connectionString = "{iot-hub-device-connectionstring}";

// fromConnectionString must specify a transport constructor, coming from any transport package.
var client = Client.fromConnectionString(connectionString, Protocol);

var connectCallback = function (err) {
  if (err) {
    console.error('Could not connect: ' + err.message);
  } else {

    console.log('Client connected');
    client.on('message', function (msg) {

        // When using MQTT the following line is a no-op.
        client.complete(msg, printResultFor('completed'));
        console.log(msg.properties.propertyList[1].value);              
    });

    client.on('error', function (err) {
      console.error(err.message);
    });

    client.on('disconnect', function () {
      client.removeAllListeners();
      client.open(connectCallback);
    });
  }
};

client.open(connectCallback);

// Helper function to print results in the console
function printResultFor(op) {
  return function printResult(err, res) {
    if (err) console.log(op + ' error: ' + err.toString());
    if (res) console.log(op + ' status: ' + res.constructor.name);
  };
}

Результаты теста enter image description here

BTW,Я обновил все библиотеки Azure- * до последней версии.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...