передача сообщения утешения с использованием событий - PullRequest
0 голосов
/ 23 мая 2019

Я пытаюсь опубликовать сообщение об утешении в теме, а затем, получив его в подписчике, подписчик отправляет это сообщение через события в другую часть моего приложения для дальнейшей обработки.Все это происходит в одном процессе node.js.Я знаю, что это может быть лучшим практическим способом ведения дел, поэтому не стесняйтесь указывать мне в лучшем направлении.Я знаю, что вместо событий я могу использовать очереди сообщений, но я хотел бы посмотреть, может ли это быть связано с событиями.

У меня есть следующие файлы:
server.js : код инициализации сервера
TopicPublisher.js : публикует сообщение утешения в теме
TopicSubscriber.js : подписывается на тему утешения, чтобы получать сообщения, а затем отправляет событие ссодержимое сообщения
SocketWrapper.js : получает событие, отправленное TopicSubscriber.js и выполняет дальнейшую обработку

server.js

function initServer() {
    const express = require('express'),
        app = express(),
        server = require('http').createServer(app);
    io = require('socket.io')(server);

    const SocketWrapper = require('./SocketWrapper');
    let sockets = new Set();

    //This example emits to individual sockets (track by sockets Set above).
    //Could also add sockets to a "room" as well using socket.join('roomId')
    //https://socket.io/docs/server-api/#socket-join-room-callback

    app.use(express.static(__dirname + '/dist'));

    /*
    everytime there is a new connection, there is a new socket
    */
    io.on('connection', socket => {

        sockets.add(socket);
        //add socket to heartbeatGenerator as well
        socketWrapper.sockets.add(socket);
        console.log(`Socket ${socket.id} added`);

        socket.on('clientdata', data => {
            console.log(data);
        });

        if (!socketWrapper.started) {
            socketWrapper.start();
        }

        socket.on('disconnect', () => {
            console.log(`Deleting socket: ${socket.id}`);
            sockets.delete(socket);
            console.log(`Remaining sockets: ${sockets.size}`);
        });

    });

    server.listen(8080);
    console.log('Visit http://localhost:8080 in your browser');

}

initServer();
//1000ms to ensure async behaviors perform in a sync manner
require('./solace/TopicSubscriber', 1000);
//1500ms to ensure async behaviors perform in a sync manner
setTimeout(() => { require('./solace/TopicPublisher');}, 1500);

Я знаю setTimeout() подход хакерский .. просто пытаюсь заставить работать эту версию.

TopicSubscriber.js

const solace = require('solclientjs').debug; // logging supported
const eventEmitter = require('../EventEmitter');
const fs = require('fs');
const path = require('path');

// Initialize factory with the most recent API defaults
const factoryProps = new solace.SolclientFactoryProperties();
factoryProps.profile = solace.SolclientFactoryProfiles.version10;
solace.SolclientFactory.init(factoryProps);

// enable logging to JavaScript console at WARN level
// NOTICE: works only with ('solclientjs').debug
solace.SolclientFactory.setLogLevel(solace.LogLevel.WARN);

const TopicSubscriber = function (solaceModule, topicName) {
    'use strict';
    const solace = solaceModule;
    const subscriber = {};
    subscriber.session = null;
    subscriber.topicName = topicName;
    subscriber.subscribed = false;

    // Logger
    subscriber.log = function (line) {
        const now = new Date();
        const time = [('0' + now.getHours()).slice(-2), ('0' + now.getMinutes()).slice(-2),
        ('0' + now.getSeconds()).slice(-2)];
        const timestamp = '[' + time.join(':') + '] ';
        console.log(timestamp + line);
    };

    subscriber.log('\n*** Subscriber to topic "' + subscriber.topicName + '" is ready to connect ***');

    // main function
    subscriber.run = function (argv) {
        subscriber.connect(argv);
    };

    // Establishes connection to Solace message router
    subscriber.connect = function (argv) {
        if (subscriber.session !== null) {
            subscriber.log('Already connected and ready to subscribe.');
            return;
        }
        // extract params
        if (argv.length < 3) { // expecting 3 real arguments
            subscriber.log('Cannot connect: expecting all arguments' +
                ' <protocol://host[:port]> <client-username>@<message-vpn> <client-password>.\n' +
                'Available protocols are ws://, wss://, http://, https://, tcp://, tcps://');
            process.exit();
        }
        const hosturl = argv[0];
        subscriber.log('Connecting to Solace message router using url: ' + hosturl);
        const usernamevpn = argv[1];
        const username = usernamevpn.split('@')[0];
        subscriber.log('Client username: ' + username);
        const vpn = usernamevpn.split('@')[1];
        subscriber.log('Solace message router VPN name: ' + vpn);
        const pass = argv[2];
        // create session
        try {
            subscriber.session = solace.SolclientFactory.createSession({
                // solace.SessionProperties
                url: hosturl,
                vpnName: vpn,
                userName: username,
                password: pass,
            });
        } catch (error) {
            subscriber.log(error.toString());
        }
        // define session event listeners
        subscriber.session.on(solace.SessionEventCode.UP_NOTICE, function (sessionEvent) {
            subscriber.log('=== Successfully connected and ready to subscribe. ===');
            subscriber.subscribe();
        });
        subscriber.session.on(solace.SessionEventCode.CONNECT_FAILED_ERROR, function (sessionEvent) {
            subscriber.log('Connection failed to the message router: ' + sessionEvent.infoStr +
                ' - check correct parameter values and connectivity!');
        });
        subscriber.session.on(solace.SessionEventCode.DISCONNECTED, function (sessionEvent) {
            subscriber.log('Disconnected.');
            subscriber.subscribed = false;
            if (subscriber.session !== null) {
                subscriber.session.dispose();
                subscriber.session = null;
            }
        });
        subscriber.session.on(solace.SessionEventCode.SUBSCRIPTION_ERROR, function (sessionEvent) {
            subscriber.log('Cannot subscribe to topic: ' + sessionEvent.correlationKey);
        });
        subscriber.session.on(solace.SessionEventCode.SUBSCRIPTION_OK, function (sessionEvent) {
            if (subscriber.subscribed) {
                subscriber.subscribed = false;
                subscriber.log('Successfully unsubscribed from topic: ' + sessionEvent.correlationKey);
            } else {
                subscriber.subscribed = true;
                subscriber.log('Successfully subscribed to topic: ' + sessionEvent.correlationKey);
                subscriber.log('=== Ready to receive messages. ===');
            }
        });
        // define message event listener
        subscriber.session.on(solace.SessionEventCode.MESSAGE, function (message) {
            subscriber.log('Received message: "' + message.getBinaryAttachment() + '", details:\n' +
                message.dump());

            //TODO: where the event emitting occurs
            const eventStatus = eventEmitter.emit('heartbeat-msg', message.getBinaryAttachment());
            console.log(`eventStatus: ${eventStatus}`);
        });
        // connect the session
        try {
            subscriber.session.connect();
        } catch (error) {
            subscriber.log(error.toString());
        }
    };

    // Subscribes to topic on Solace message router
    subscriber.subscribe = function () {
        if (subscriber.session !== null) {
            if (subscriber.subscribed) {
                subscriber.log('Already subscribed to "' + subscriber.topicName
                    + '" and ready to receive messages.');
            } else {
                subscriber.log('Subscribing to topic: ' + subscriber.topicName);
                try {
                    subscriber.session.subscribe(
                        solace.SolclientFactory.createTopicDestination(subscriber.topicName),
                        true, // generate confirmation when subscription is added successfully
                        subscriber.topicName, // use topic name as correlation key
                        10000 // 10 seconds timeout for this operation
                    );
                } catch (error) {
                    subscriber.log(error.toString());
                }
            }
        } else {
            subscriber.log('Cannot subscribe because not connected to Solace message router.');
        }
    };

    subscriber.exit = function () {
        subscriber.unsubscribe();
        subscriber.disconnect();
        setTimeout(function () {
            process.exit();
        }, 1000); // wait for 1 second to finish
    };

    // Unsubscribes from topic on Solace message router
    subscriber.unsubscribe = function () {
        if (subscriber.session !== null) {
            if (subscriber.subscribed) {
                subscriber.log('Unsubscribing from topic: ' + subscriber.topicName);
                try {
                    subscriber.session.unsubscribe(
                        solace.SolclientFactory.createTopicDestination(subscriber.topicName),
                        true, // generate confirmation when subscription is removed successfully
                        subscriber.topicName, // use topic name as correlation key
                        10000 // 10 seconds timeout for this operation
                    );
                } catch (error) {
                    subscriber.log(error.toString());
                }
            } else {
                subscriber.log('Cannot unsubscribe because not subscribed to the topic "'
                    + subscriber.topicName + '"');
            }
        } else {
            subscriber.log('Cannot unsubscribe because not connected to Solace message router.');
        }
    };

    // Gracefully disconnects from Solace message router
    subscriber.disconnect = function () {
        subscriber.log('Disconnecting from Solace message router...');
        if (subscriber.session !== null) {
            try {
                subscriber.session.disconnect();
            } catch (error) {
                subscriber.log(error.toString());
            }
        } else {
            subscriber.log('Not connected to Solace message router.');
        }
    };

    return subscriber;
};

const connectionArgs = []; //not provided for security reasons
subscriber.run(connectionArgs);


process.on('SIGINT', function () {
    'use strict';
    subscriber.exit();
});

Найдите строку //TODO: where the event emitting occurs, чтобы найти соответствующую часть кода.

TopicPublisher.js

const TopicPublisher = function (solaceModule, topicName) {
    'use strict';
    const solace = solaceModule;
    const publisher = {};
    publisher.session = null;
    publisher.topicName = topicName;

    // Logger
    publisher.log = function (line) {
        const now = new Date();
        const time = [('0' + now.getHours()).slice(-2), ('0' + now.getMinutes()).slice(-2),
        ('0' + now.getSeconds()).slice(-2)];
        const timestamp = '[' + time.join(':') + '] ';
        console.log(timestamp + line);
    };

    publisher.log('\n*** Publisher to topic "' + publisher.topicName + '" is ready to connect ***');

    // main function
    publisher.run = function (argv) {
        publisher.connect(argv);
    };

    // Establishes connection to Solace message router
    publisher.connect = function (argv) {
        if (publisher.session !== null) {
            publisher.log('Already connected and ready to publish.');
            return;
        }
        // extract params
        if (argv.length < 3) { // expecting 3 real arguments
            publisher.log('Cannot connect: expecting all arguments' +
                ' <protocol://host[:port]> <client-username>@<message-vpn> <client-password>.\n' +
                'Available protocols are ws://, wss://, http://, https://, tcp://, tcps://');
            process.exit();
        }
        const hosturl = argv[0];
        publisher.log('Connecting to Solace message router using url: ' + hosturl);
        const usernamevpn = argv[1];
        const username = usernamevpn.split('@')[0];
        publisher.log('Client username: ' + username);
        const vpn = usernamevpn.split('@')[1];
        publisher.log('Solace message router VPN name: ' + vpn);
        const pass = argv[2];
        // create session
        try {
            publisher.session = solace.SolclientFactory.createSession({
                // solace.SessionProperties
                url: hosturl,
                vpnName: vpn,
                userName: username,
                password: pass,
            });
        } catch (error) {
            publisher.log(error.toString());
        }
        // define session event listeners
        publisher.session.on(solace.SessionEventCode.UP_NOTICE, function (sessionEvent) {
            publisher.log('=== Successfully connected and ready to publish messages. ===');
            publisher.publish();
            publisher.exit();
        });
        publisher.session.on(solace.SessionEventCode.CONNECT_FAILED_ERROR, function (sessionEvent) {
            publisher.log('Connection failed to the message router: ' + sessionEvent.infoStr +
                ' - check correct parameter values and connectivity!');
        });
        publisher.session.on(solace.SessionEventCode.DISCONNECTED, function (sessionEvent) {
            publisher.log('Disconnected.');
            if (publisher.session !== null) {
                publisher.session.dispose();
                publisher.session = null;
            }
        });
        // connect the session
        try {
            publisher.session.connect();
        } catch (error) {
            publisher.log(error.toString());
        }
    };

    // Publishes one message
    publisher.publish = function () {
        if (publisher.session !== null) {
            //NOTE: where to change sample message
            const messageText = 'SOM: 0';
            const message = solace.SolclientFactory.createMessage();
            message.setDestination(solace.SolclientFactory.createTopicDestination(publisher.topicName));
            message.setBinaryAttachment(messageText);
            message.setDeliveryMode(solace.MessageDeliveryModeType.DIRECT);
            publisher.log('Publishing message "' + messageText + '" to topic "' + publisher.topicName + '"...');
            try {
                publisher.session.send(message);
                publisher.log('Message published.');
            } catch (error) {
                publisher.log(error.toString());
            }
        } else {
            publisher.log('Cannot publish because not connected to Solace message router.');
        }
    };

    publisher.exit = function () {
        publisher.disconnect();
        //setTimeout(function () {
        //    process.exit();
        //}, 1000); // wait for 1 second to finish
    };

    // Gracefully disconnects from Solace message router
    publisher.disconnect = function () {
        publisher.log('Disconnecting from Solace message router...');
        if (publisher.session !== null) {
            try {
                publisher.session.disconnect();
            } catch (error) {
                publisher.log(error.toString());
            }
        } else {
            publisher.log('Not connected to Solace message router.');
        }
    };

    return publisher;
};

const solace = require('solclientjs').debug; // logging supported
const fs = require('fs');
// Initialize factory with the most recent API defaults
const factoryProps = new solace.SolclientFactoryProperties();
factoryProps.profile = solace.SolclientFactoryProfiles.version10;
solace.SolclientFactory.init(factoryProps);

// enable logging to JavaScript console at WARN level
// NOTICE: works only with ('solclientjs').debug
solace.SolclientFactory.setLogLevel(solace.LogLevel.WARN);

// create the publisher, specifying the name of the subscription topic
const publisher = new TopicPublisher(solace, 'tutorial/topic');

//TODO: remove single service approach?
const connectionArgs = []; //removed for security reasons
publisher.run(connectionArgs);

SocketWrapper.js

const eventEmitter = require('./EventEmitter');

class SocketWrapper {
    constructor(ECNs) {
        this.started = false;
        this.ECNs = ECNs;
        this.sockets = new Set();
    }

    start() {
        console.log('starting socketWrapper');

        this.started = true;
        if (!this.sockets.size) {
            this.started = false;
            console.log(`Dashboard service stopped.`);
        }

        //TODO: create event listener
        eventEmitter.on('heartbeat-msg', (msg) => {
            console.log('in eventEmitter.on');
            const ecnStatus = this.parseMessage(msg);
            for (const s of this.sockets) {
                console.log(`Emitting value: ${ecnStatus['ECN']}:${ecnStatus['status']}`);
                s.emit(ecnStatus['ECN'],
                    {
                        'ECN': ecnStatus['ECN'],
                        'status': ecnStatus['status']
                    }
                );
            }
        });
    }

    //let's say message comes in as:
    //<ECN>: <status>
    parseMessage(msg) {
        //TODO: return {ECN: <ECN>, status: <status>}
        const res = msg.split(':');
        return {
            'ECN': res[0],
            'status': res[1]
        }
    }
}

module.exports = SocketWrapper;

Поиск строки //TODO: create event listener для соответствующей части кода.

Ожидается:

$ node server.js
Visit http://localhost:8080 in your browser
[12:11:36]
*** Subscriber to topic "tutorial/topic" is ready to connect ***
[12:11:36] Connecting to Solace message router using url: ws://soluatph1v.wellsfargo.net
[12:11:36] Client username: etrade
[12:11:36] Solace message router VPN name: REMOVED
[12:11:36] === Successfully connected and ready to subscribe. ===
[12:11:36] Subscribing to topic: tutorial/topic
[12:11:36] Successfully subscribed to topic: tutorial/topic
[12:11:36] === Ready to receive messages. ===
[12:11:37]
*** Publisher to topic "tutorial/topic" is ready to connect ***
[12:11:37] Connecting to Solace message router using url: REMOVED
[12:11:37] Client username: REMOVED
[12:11:37] Solace message router VPN name: REMOVED
[12:11:37] === Successfully connected and ready to publish messages. ===
[12:11:37] Publishing message "SOM: 0" to topic "tutorial/topic"...
[12:11:37] Message published.
[12:11:37] Disconnecting from Solace message router...
[12:11:37] Disconnected.
[12:11:37] Received message: "SOM: 0", details:
Destination:                            [Topic tutorial/topic]
Class Of Service:                       COS1
DeliveryMode:                           DIRECT
Binary Attachment:                      len=6
  53 4f 4d 3a 20 30                                     SOM:.0

in eventEmitter.on
eventStatus: true

Последние две строки текста, которые я ищу, демонстрируют, что обработка события работала.
Факт:

$ node server.js
Visit http://localhost:8080 in your browser
[12:11:36]
*** Subscriber to topic "tutorial/topic" is ready to connect ***
[12:11:36] Connecting to Solace message router using url: ws://soluatph1v.wellsfargo.net
[12:11:36] Client username: etrade
[12:11:36] Solace message router VPN name: REMOVED
[12:11:36] === Successfully connected and ready to subscribe. ===
[12:11:36] Subscribing to topic: tutorial/topic
[12:11:36] Successfully subscribed to topic: tutorial/topic
[12:11:36] === Ready to receive messages. ===
[12:11:37]
*** Publisher to topic "tutorial/topic" is ready to connect ***
[12:11:37] Connecting to Solace message router using url: REMOVED
[12:11:37] Client username: REMOVED
[12:11:37] Solace message router VPN name: REMOVED
[12:11:37] === Successfully connected and ready to publish messages. ===
[12:11:37] Publishing message "SOM: 0" to topic "tutorial/topic"...
[12:11:37] Message published.
[12:11:37] Disconnecting from Solace message router...
[12:11:37] Disconnected.
[12:11:37] Received message: "SOM: 0", details:
Destination:                            [Topic tutorial/topic]
Class Of Service:                       COS1
DeliveryMode:                           DIRECT
Binary Attachment:                      len=6
  53 4f 4d 3a 20 30                                     SOM:.0

eventStatus: false

1 Ответ

0 голосов
/ 23 мая 2019

Исправлен этот вопрос.

Пара вещей, которые я пропустил: 1. У меня никогда не было сокетного соединения с нашим сервером, поэтому socketWrapper.start() никогда не вызывался.2. Во время примитивного тестирования в SocketWrapper.js;то есть: отправка события, а также объявление слушателя в этом файле. Я заметил, что слушатель был объявлен после событие было отправлено.Из-за асинхронной природы node.js важно убедиться, что все работает синхронно.

...