У changefeeds есть такой же фильтр и есть тот же курсор? - PullRequest
0 голосов
/ 09 июня 2018

Код:

const WebSocket = require('ws')
const r = require('rethinkdb')

async function start () {
  try {
    /**
     * Connect database
     */
    const conn = await r.connect()
    conn.use('app')

    /**
     * Create socket server
     */
    const wss = new WebSocket.Server({ port: 8082 })
    wss.on('connection', (client) => {
      /**
       * Client setting subscribe rethinkdb data
       * WARNING : doesn't validate payload
       */
      client.on('message', (payload) => {
        const filterData = JSON.parse(payload)

        /**
         * Send all data base on filter
         *

        /**
         * Subscribe rethink data base on message request
         * Send all data base on update
         */
        r.table('foo')
          .changes({
            'includeTypes': true,
            'includeInitial': true,
            "squash": true // Squash many changes into one event
          })
          .run(conn, async (err, cursor) => {
            if (err) {
              throw err
            }

            client.cursor = cursor
            cursor.each((err, row) => {
              if (err) {
                throw err
              }

              /**
               * Check if client is connected
               * Else close cursor
               *
               * Also useful for check if connect is dead
               */
              if (client.readyState !== client.OPEN) {
                client.terminate()
                client.cursor.close()
                return
              }

              client.send(JSON.stringify(row))
            })

            /**
             * Close stream when :
             * Client disconnect
             * Client send new message
             */

            client.on('message', () => {
              cursor.close()
            })
          })
      })
    })
  } catch (err) {
    console.log(err)
  }
}

start()

Ошибка

    ReqlDriverError: Cursor is closed.
    at ReqlDriverError.ReqlError [as constructor] (d:\project\vietnam-traffic-map\websocket\node_modules\rethinkdb\errors.js:23:13)
    at new ReqlDriverError (d:\project\vietnam-traffic-map\websocket\node_modules\rethinkdb\errors.js:68:50)
    at Feed.IterableResult._promptNext (d:\project\vietnam-traffic-map\websocket\node_modules\rethinkdb\cursor.js:128:10)
    at d:\project\vietnam-traffic-map\websocket\node_modules\rethinkdb\cursor.js:200:22
    at tryCatcher (d:\project\vietnam-traffic-map\websocket\node_modules\bluebird\js\main\util.js:26:23)
    at Function.Promise.fromNode (d:\project\vietnam-traffic-map\websocket\node_modules\bluebird\js\main\promise.js:168:30)
    at Feed.<anonymous> (d:\project\vietnam-traffic-map\websocket\node_modules\rethinkdb\cursor.js:203:20)
    at Feed.<anonymous> (d:\project\vietnam-traffic-map\websocket\node_modules\rethinkdb\util.js:43:16)
    at d:\project\vietnam-traffic-map\websocket\node_modules\rethinkdb\cursor.js:262:24
    at tryCatcher (d:\project\vietnam-traffic-map\websocket\node_modules\bluebird\js\main\util.js:26:23)
    at Promise.successAdapter (d:\project\vietnam-traffic-map\websocket\node_modules\bluebird\js\main\nodeify.js:23:30)
    at Promise._settlePromiseAt (d:\project\vietnam-traffic-map\websocket\node_modules\bluebird\js\main\promise.js:582:21)
    at Promise._settlePromises (d:\project\vietnam-traffic-map\websocket\node_modules\bluebird\js\main\promise.js:700:14)
    at Async._drainQueue (d:\project\vietnam-traffic-map\websocket\node_modules\bluebird\js\main\async.js:123:16)
    at Async._drainQueues (d:\project\vietnam-traffic-map\websocket\node_modules\bluebird\js\main\async.js:133:10)
    at Immediate.Async.drainQueues (d:\project\vietnam-traffic-map\websocket\node_modules\bluebird\js\main\async.js:15:14)

Воспроизвести проблему: :

  • Подключите один
  • Подключитесь снова (предыдущее соединение закрыто, но его событие не было закрыто)
  • Изменить данные в rethinkdb (в этот момент я сделал точку останова воператор if)
  • Первое соединение закрыто -> начать закрывать курсор и остановить выполнение далее
  • Второе не было закрыто
  • Ошибка: курсорбыл закрыт

Вопрос: Имеет ли changefeeds тот же фильтр, также есть тот же курсор?У каждого клиента websocket должен быть свой курсор изменения.Почему я закрываю первое гнездо, а затем второе тоже близко?

1 Ответ

0 голосов
/ 09 июня 2018

Изменить это

if (client.readyState !== client.OPEN) {
    client.terminate()
    client.cursor.close()
    return
}

client.send(JSON.stringify(row))

На это:

if (client.readyState === client.OPEN) {
  client.send(JSON.stringify(row))
} else {
  await cursor.close()
  client.terminate()
} 

Исправить мою проблему.Я все еще получаю ошибку: ReqlDriverError: Курсор закрыт. в попытке поймать ...

...