Как я могу реализовать логику канала c в моем WebSocket, включая события, к которым я могу привязаться? - PullRequest
0 голосов
/ 30 марта 2020

Передо мной стоит очень большая проблема: мой собственный сервер WebSocket. Я уже запрограммировал аутентификацию и все детали так, чтобы сервер выполнял и обрабатывал запросы, но в настоящее время я застрял с реальным logi c.

Для этого я написал класс в JavaScript, который должен называться следующим образом:

let token       = "ABC",
    myWebSocketClient = new MyWebSocketClient(token);

myWebSocketClient.subscribe("cWhFVEtlOENNM0VwNmlrUlBmOVF1UT09").bind("create_browser_push", function (response) {
    console.log(response);
});

Идея состоит в том, что я могу подписаться на специальный канал с .subscribe() и прослушайте специальное событие с .bind(). Для этого я нашел этот файл в качестве своей ориентации, но это немного для меня и моих потребностей:

https://github.com/pusher/pusher-js/blob/0d718adf06566766e0f544d74565d2ff3dcc0ff8/src/core/pusher.ts#L254

Мой MyWebSocketClient класс выглядит так

class MyWebSocketClient {
    constructor(token) {
        this.url   = "ws://localhost:8181";
        this.token = token;

        this.connect();
    }

    connect() {
        let self = this,
            ws   = new WebSocket(self.url);

        ws.onopen = function () {
            let authMsg = {
                type: "authenticate",
                payload: {token: self.token}
            };

            ws.send(JSON.stringify(authMsg));
        };
    }

    subscribe(channelName) {

    }

    bind(eventName) {

    }
}

Мой клиент подключается и аутентифицируется с моим WebSocket Server в NodeJS, что делает меня по-настоящему счастливым. Это просто, но работает:

const WebSocket  = require('ws').Server;
const express    = require('express');
const bodyParser = require('body-parser');

const app = express();
const wss = new WebSocket({
    server: app.listen(8181)
});

let wsObj = null;

app.use(bodyParser.json());

wss.on('connection', ws => {
    wsObj = ws;

    ws.on('message', toEvent).on('authenticate', function (data) {
        const receivedToken = data.token;

        if (!receivedToken || receivedToken === '') {
            return ws.close(4000, 'Bad Request');
        }

        verifyToken(receivedToken).then(() => {
            //User is authenticated and connected
        }).catch((err) => {
            if (err === 401) {
                return ws.close(4010, 'Unauthorized');
            } else {
                return ws.close(5000, 'Internal Server Error');
            }
        });
    });
});

app.post('/', function (req, res) {
    const receivedToken = req.query.token;

    if (!receivedToken || receivedToken === '') {
        return res.sendStatus(400).end();
    }

    verifyToken(receivedToken).then(() => {
        //Request is valid and can be handled / maybe we need the wsObj here

        console.log(req.body);
        //Result of a request:

        //{
        //    name: 'create_browser_push', <- The event
        //    data: 'null', <- Should be sent back to the client to be accessible over the "response" parameter
        //    channels: [ 'cWhFVEtlOENNM0VwNmlrUlBmOVF1UT09' ]
        //}

    }).catch(() => {
        return res.sendStatus(401).end();
    });
});

function toEvent(message) {
    try {
        const event = JSON.parse(message);

        this.emit(event.type, event.payload);
    } catch (err) {
        console.log('Not an event: ', err);
    }
}

function verifyToken(token) {
    return new Promise(function (resolve, reject) {
        if (token === 'ABC') {
            resolve();
        } else {
            reject(401);
        }
    });
}

У кого-то есть идея, как мне сделать это простым способом? Так что, когда я отправлял что-то из бэкэнда, что пользователь, подключенный к каналу и bindet к событию, получает уведомление?

И возможно ли каким-то образом, что сервер знает, какой клиент подписан на указанную c канал / событие, чтобы мне не нужно было проверять его на стороне клиента (если это имеет смысл)?

Большое спасибо за вашу помощь!

Ответы [ 2 ]

1 голос
/ 30 марта 2020

Для каждого подключенного клиента при connection вызове события:

  1. Создание уникального идентификатора (ID) для соединения;
  2. Сохранение объекта сокета в глобальном объекте.

Для каждого subscribe вызова события:

  1. Сохранить идентификатор подключения к глобальному объекту, отслеживая все подписки на каналы.

Итак при публикации данных в канал:

  1. Фильтрация всех идентификаторов активных подключений, подписанных на этот канал;
  2. Получение всех объектов подключения для этих идентификаторов;
  3. Использование объектов подключения отправить сообщение.

Надеюсь, это поможет.

0 голосов
/ 30 марта 2020

У меня ограниченный JavaScript опыт, так как все мои проекты в основном Python / Django, но я действительно просто искал JavaScript, когда это было необходимо. Поэтому я использую этот скрипт, который передает активность моим уведомлениям, например, новый лайк, комментарий, сообщение и т. Д. c. Кроме того, я думаю, что вы можете создать веб-сокет, запустив npm run browserify. Взгляните на этот код, который есть в моем проекте. Надеюсь, ты сможешь что-то из этого извлечь.

(function(f){if(typeof exports==="object"&&typeof module!=="undefined"){module.exports=f()}else if(typeof define==="function"&&define.amd){define([],f)}else{var g;if(typeof window!=="undefined"){g=window}else if(typeof global!=="undefined"){g=global}else if(typeof self!=="undefined"){g=self}else{g=this}g.channels = f()}})(function(){var define,module,exports;return (function e(t,n,r){function s(o,u){if(!n[o]){if(!t[o]){var a=typeof require=="function"&&require;if(!u&&a)return a(o,!0);if(i)return i(o,!0);var f=new Error("Cannot find module '"+o+"'");throw f.code="MODULE_NOT_FOUND",f}var l=n[o]={exports:{}};t[o][0].call(l.exports,function(e){var n=t[o][1][e];return s(n?n:e)},l,l.exports,e,t,n,r)}return n[o].exports}var i=typeof require=="function"&&require;for(var o=0;o<r.length;o++)s(r[o]);return s})({1:[function(require,module,exports){
"use strict";
var isWebSocket = function (constructor) {
    return constructor && constructor.CLOSING === 2;
};
var isGlobalWebSocket = function () {
    return typeof WebSocket !== 'undefined' && isWebSocket(WebSocket);
};
var getDefaultOptions = function () { return ({
    constructor: isGlobalWebSocket() ? WebSocket : null,
    maxReconnectionDelay: 10000,
    minReconnectionDelay: 1500,
    reconnectionDelayGrowFactor: 1.3,
    connectionTimeout: 4000,
    maxRetries: Infinity,
    debug: false,
}); };
var bypassProperty = function (src, dst, name) {
    Object.defineProperty(dst, name, {
        get: function () { return src[name]; },
        set: function (value) { src[name] = value; },
        enumerable: true,
        configurable: true,
    });
};
var initReconnectionDelay = function (config) {
    return (config.minReconnectionDelay + Math.random() * config.minReconnectionDelay);
};
var updateReconnectionDelay = function (config, previousDelay) {
    var newDelay = previousDelay * config.reconnectionDelayGrowFactor;
    return (newDelay > config.maxReconnectionDelay)
        ? config.maxReconnectionDelay
        : newDelay;
};
var LEVEL_0_EVENTS = ['onopen', 'onclose', 'onmessage', 'onerror'];
var reassignEventListeners = function (ws, oldWs, listeners) {
    Object.keys(listeners).forEach(function (type) {
        listeners[type].forEach(function (_a) {
            var listener = _a[0], options = _a[1];
            ws.addEventListener(type, listener, options);
        });
    });
    if (oldWs) {
        LEVEL_0_EVENTS.forEach(function (name) { ws[name] = oldWs[name]; });
    }
};
var ReconnectingWebsocket = function (url, protocols, options) {
    var _this = this;
    if (options === void 0) { options = {}; }
    var ws;
    var connectingTimeout;
    var reconnectDelay = 0;
    var retriesCount = 0;
    var shouldRetry = true;
    var savedOnClose = null;
    var listeners = {};
    // require new to construct
    if (!(this instanceof ReconnectingWebsocket)) {
        throw new TypeError("Failed to construct 'ReconnectingWebSocket': Please use the 'new' operator");
    }
    // Set config. Not using `Object.assign` because of IE11
    var config = getDefaultOptions();
    Object.keys(config)
        .filter(function (key) { return options.hasOwnProperty(key); })
        .forEach(function (key) { return config[key] = options[key]; });
    if (!isWebSocket(config.constructor)) {
        throw new TypeError('Invalid WebSocket constructor. Set `options.constructor`');
    }
    var log = config.debug ? function () {
        var params = [];
        for (var _i = 0; _i < arguments.length; _i++) {
            params[_i - 0] = arguments[_i];
        }
        return console.log.apply(console, ['RWS:'].concat(params));
    } : function () { };
    /**
     * Not using dispatchEvent, otherwise we must use a DOM Event object
     * Deferred because we want to handle the close event before this
     */
    var emitError = function (code, msg) { return setTimeout(function () {
        var err = new Error(msg);
        err.code = code;
        if (Array.isArray(listeners.error)) {
            listeners.error.forEach(function (_a) {
                var fn = _a[0];
                return fn(err);
            });
        }
        if (ws.onerror) {
            ws.onerror(err);
        }
    }, 0); };
    var handleClose = function () {
        log('close');
        retriesCount++;
        log('retries count:', retriesCount);
        if (retriesCount > config.maxRetries) {
            emitError('EHOSTDOWN', 'Too many failed connection attempts');
            return;
        }
        if (!reconnectDelay) {
            reconnectDelay = initReconnectionDelay(config);
        }
        else {
            reconnectDelay = updateReconnectionDelay(config, reconnectDelay);
        }
        log('reconnectDelay:', reconnectDelay);
        if (shouldRetry) {
            setTimeout(connect, reconnectDelay);
        }
    };
    var connect = function () {
        log('connect');
        var oldWs = ws;
        ws = new config.constructor(url, protocols);
        connectingTimeout = setTimeout(function () {
            log('timeout');
            ws.close();
            emitError('ETIMEDOUT', 'Connection timeout');
        }, config.connectionTimeout);
        log('bypass properties');
        for (var key in ws) {
            // @todo move to constant
            if (['addEventListener', 'removeEventListener', 'close', 'send'].indexOf(key) < 0) {
                bypassProperty(ws, _this, key);
            }
        }
        ws.addEventListener('open', function () {
            clearTimeout(connectingTimeout);
            log('open');
            reconnectDelay = initReconnectionDelay(config);
            log('reconnectDelay:', reconnectDelay);
            retriesCount = 0;
        });
        ws.addEventListener('close', handleClose);
        reassignEventListeners(ws, oldWs, listeners);
        // because when closing with fastClose=true, it is saved and set to null to avoid double calls
        ws.onclose = ws.onclose || savedOnClose;
        savedOnClose = null;
    };
    log('init');
    connect();
    this.close = function (code, reason, _a) {
        if (code === void 0) { code = 1000; }
        if (reason === void 0) { reason = ''; }
        var _b = _a === void 0 ? {} : _a, _c = _b.keepClosed, keepClosed = _c === void 0 ? false : _c, _d = _b.fastClose, fastClose = _d === void 0 ? true : _d, _e = _b.delay, delay = _e === void 0 ? 0 : _e;
        if (delay) {
            reconnectDelay = delay;
        }
        shouldRetry = !keepClosed;
        ws.close(code, reason);
        if (fastClose) {
            var fakeCloseEvent_1 = {
                code: code,
                reason: reason,
                wasClean: true,
            };
            // execute close listeners soon with a fake closeEvent
            // and remove them from the WS instance so they
            // don't get fired on the real close.
            handleClose();
            ws.removeEventListener('close', handleClose);
            // run and remove level2
            if (Array.isArray(listeners.close)) {
                listeners.close.forEach(function (_a) {
                    var listener = _a[0], options = _a[1];
                    listener(fakeCloseEvent_1);
                    ws.removeEventListener('close', listener, options);
                });
            }
            // run and remove level0
            if (ws.onclose) {
                savedOnClose = ws.onclose;
                ws.onclose(fakeCloseEvent_1);
                ws.onclose = null;
            }
        }
    };
    this.send = function (data) {
        ws.send(data);
    };
    this.addEventListener = function (type, listener, options) {
        if (Array.isArray(listeners[type])) {
            if (!listeners[type].some(function (_a) {
                var l = _a[0];
                return l === listener;
            })) {
                listeners[type].push([listener, options]);
            }
        }
        else {
            listeners[type] = [[listener, options]];
        }
        ws.addEventListener(type, listener, options);
    };
    this.removeEventListener = function (type, listener, options) {
        if (Array.isArray(listeners[type])) {
            listeners[type] = listeners[type].filter(function (_a) {
                var l = _a[0];
                return l !== listener;
            });
        }
        ws.removeEventListener(type, listener, options);
    };
};
module.exports = ReconnectingWebsocket;

},{}],2:[function(require,module,exports){
'use strict';

Object.defineProperty(exports, "__esModule", {
  value: true
});
exports.WebSocketBridge = undefined;

var _extends = Object.assign || function (target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i]; for (var key in source) { if (Object.prototype.hasOwnProperty.call(source, key)) { target[key] = source[key]; } } } return target; };

var _createClass = function () { function defineProperties(target, props) { for (var i = 0; i < props.length; i++) { var descriptor = props[i]; descriptor.enumerable = descriptor.enumerable || false; descriptor.configurable = true; if ("value" in descriptor) descriptor.writable = true; Object.defineProperty(target, descriptor.key, descriptor); } } return function (Constructor, protoProps, staticProps) { if (protoProps) defineProperties(Constructor.prototype, protoProps); if (staticProps) defineProperties(Constructor, staticProps); return Constructor; }; }();

var _reconnectingWebsocket = require('reconnecting-websocket');

var _reconnectingWebsocket2 = _interopRequireDefault(_reconnectingWebsocket);

function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }

function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } }

/**
 * Bridge between Channels and plain javascript.
 *
 * @example
 * const webSocketBridge = new WebSocketBridge();
 * webSocketBridge.connect();
 * webSocketBridge.listen(function(action, stream) {
 *   console.log(action, stream);
 * });
 */
var WebSocketBridge = function () {
  function WebSocketBridge(options) {
    _classCallCheck(this, WebSocketBridge);

    /**
     * The underlaying `ReconnectingWebSocket` instance.
     * 
     * @type {ReconnectingWebSocket}
     */
    this.socket = null;
    this.streams = {};
    this.default_cb = null;
    this.options = _extends({}, options);
  }

  /**
   * Connect to the websocket server
   *
   * @param      {String}  [url]     The url of the websocket. Defaults to
   * `window.location.host`
   * @param      {String[]|String}  [protocols] Optional string or array of protocols.
   * @param      {Object} options Object of options for [`reconnecting-websocket`](https://github.com/joewalnes/reconnecting-websocket#options-1).
   * @example
   * const webSocketBridge = new WebSocketBridge();
   * webSocketBridge.connect();
   */


  _createClass(WebSocketBridge, [{
    key: 'connect',
    value: function connect(url, protocols, options) {
      var _url = void 0;
      // Use wss:// if running on https://
      var scheme = window.location.protocol === 'https:' ? 'wss' : 'ws';
      var base_url = scheme + '://' + window.location.host;
      if (url === undefined) {
        _url = base_url;
      } else {
        // Support relative URLs
        if (url[0] == '/') {
          _url = '' + base_url + url;
        } else {
          _url = url;
        }
      }
      this.socket = new _reconnectingWebsocket2.default(_url, protocols, options);
    }

    /**
     * Starts listening for messages on the websocket, demultiplexing if necessary.
     *
     * @param      {Function}  [cb]         Callback to be execute when a message
     * arrives. The callback will receive `action` and `stream` parameters
     *
     * @example
     * const webSocketBridge = new WebSocketBridge();
     * webSocketBridge.connect();
     * webSocketBridge.listen(function(action, stream) {
     *   console.log(action, stream);
     * });
     */

  }, {
    key: 'listen',
    value: function listen(cb) {
      var _this = this;

      this.default_cb = cb;
      this.socket.onmessage = function (event) {
        var msg = JSON.parse(event.data);
        var action = void 0;
        var stream = void 0;

        if (msg.stream !== undefined) {
          action = msg.payload;
          stream = msg.stream;
          var stream_cb = _this.streams[stream];
          stream_cb ? stream_cb(action, stream) : null;
        } else {
          action = msg;
          stream = null;
          _this.default_cb ? _this.default_cb(action, stream) : null;
        }
      };
    }

    /**
     * Adds a 'stream handler' callback. Messages coming from the specified stream
     * will call the specified callback.
     *
     * @param      {String}    stream  The stream name
     * @param      {Function}  cb      Callback to be execute when a message
     * arrives. The callback will receive `action` and `stream` parameters.
      * @example
     * const webSocketBridge = new WebSocketBridge();
     * webSocketBridge.connect();
     * webSocketBridge.listen();
     * webSocketBridge.demultiplex('mystream', function(action, stream) {
     *   console.log(action, stream);
     * });
     * webSocketBridge.demultiplex('myotherstream', function(action, stream) {
     *   console.info(action, stream);
     * });
     */

  }, {
    key: 'demultiplex',
    value: function demultiplex(stream, cb) {
      this.streams[stream] = cb;
    }

    /**
     * Sends a message to the reply channel.
     *
     * @param      {Object}  msg     The message
     *
     * @example
     * webSocketBridge.send({prop1: 'value1', prop2: 'value1'});
     */

  }, {
    key: 'send',
    value: function send(msg) {
      this.socket.send(JSON.stringify(msg));
    }

    /**
     * Returns an object to send messages to a specific stream
     *
     * @param      {String}  stream  The stream name
     * @return     {Object}  convenience object to send messages to `stream`.
     * @example
     * webSocketBridge.stream('mystream').send({prop1: 'value1', prop2: 'value1'})
     */

  }, {
    key: 'stream',
    value: function stream(_stream) {
      var _this2 = this;

      return {
        send: function send(action) {
          var msg = {
            stream: _stream,
            payload: action
          };
          _this2.socket.send(JSON.stringify(msg));
        }
      };
    }
  }]);

  return WebSocketBridge;
}();

exports.WebSocketBridge = WebSocketBridge;

},{"reconnecting-websocket":1}]},{},[2])(2)
});
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...