Node-Serialport: невозможно обработать входящие данные с помощью RxJS Observable при применении тайм-аута - PullRequest
0 голосов
/ 05 января 2019

В настоящее время я работаю над проектом NodeJS, использующим модуль serialport в сочетании с RxJS Observables . Предполагаемый «поток» / вариант использования следующий:

  1. имя последовательного порта portName отправляется через последовательный порт
  2. поскольку RxD и TxD связаны друг с другом, данные «отражаются» на аппаратной стороне
  3. данные считываются через последовательный порт
  4. входящие данные обрабатываются readline-parser и передаются в RxJS Observable
  5. , если чтение data равно ранее отправленному portName, наблюдаемое больше не требуется и будет завершено к observer.complete()

Мне удалось реализовать вышеупомянутый поток, но мне нужно сделать некоторые дополнительные реализации, такие как

  1. тайм-аут, если данные не получены в течение определенного периода времени
  2. повторная попытка отправить команду снова в случае тайм-аута или других ошибок

Я работаю над реализацией тайм-аута и попробовал обе NodeJS 'setTimeout() и RxJS' собственную timeout функцию . При применении любой функции тайм-аута данные, по-видимому, не считываются / извлекаются последовательным портом, что, в свою очередь, вызывает ошибку тайм-аута.

Предполагая, что данных нет, на первый взгляд это кажется вполне приемлемым, поскольку время ожидания делает то, что должно. Однако мне удалось дважды проверить, что нужные данные отправляются в порт, используя не только программный эмулируемый последовательный порт, но и два преобразователя USB-to-Serial CP2102 (см. Комментарии в коде для получения дополнительной информации). ):

'use strict';
const Rx = require('rxjs');
const { interval } = require('rxjs');
const { timeout } = require('rxjs/operators');
const SerialPort = require('serialport');
const Readline = require('@serialport/parser-readline');


// `tries` is needed for later implementation of communcation retries
const myObservable = (portName, tries) => {
  const port = new SerialPort(portName);
  const parser = port.pipe( new Readline() );
  port.write(`${portName}\n`);

  return Rx.Observable
    .create( (observer) => {
      parser.on('data', (data) => {
        observer.next(data);
        console.log(`Detection will be: ${data == portName} (${data} vs. ${portName})`);
        if (data == portName)
        {
          port.close( (err) => {
            if (err)
            {
              console.log(`Error on closing serial port: ${err}`);
              observer.error(err);
            }
          });
          observer.complete();
        }
      })
    })
    // `timeout` is needed for later implementation of communication timeout, see comment at end of code
    // .pipe(timeout(10000))
}


const myObserver = {
  next:     x => console.log(`got data from observable: ${x}`),
  error:    err => console.error(`something wrong occurred: ${err}`),
  complete: () => console.log('done'),
};


console.log('before subscribe');
const sub = myObservable('/dev/tty.usbserial-FTG7L3FX', null).subscribe(myObserver);
// double-checked that data is sent by using software (created an emulated pair of virtual serial ports with `socat -d -d pty,raw,echo=0 pty,raw,echo=0`)
// --> data is sent, but not read/retrieved when either using `setTimeout` or RxJS' own `timeout()`
// const sub = myObservable('/dev/ttys003', null).subscribe(myObserver);
// double-checked that data is sent by using hardware interfaces (used two CP2102 modules with pairwise-crossed RxD and TxD)
// --> data is sent, but not read/retrieved when either using `setTimeout` or RxJS' own `timeout()`
// const sub = myObservable('/dev/tty.SLAB_USBtoUART', null).subscribe(myObserver);
console.log('after subscribe');


// when commenting the following `setTimeout()` data is retrieved, but does not work with `setTimeout()`
// so tried to use RxJS' `timeout()` operator --> not working either
// setTimeout(() => {
//   sub.unsubscribe();
//   console.log('unsubscribed');
// }, 10000);

Что мне здесь не хватает? Почему данные отправляются, когда не используется какой-либо таймаут, а не при применении функции тайм-аута?


Обновления в связи с дальнейшим расследованием:

  1. Когда применяется timeout(), данные отправляются после истечения времени ожидания, что означает, что время ожидания запускает как отправку данных, так и выход из наблюдаемого, поскольку оно истекло. Таким образом, .timeout(), кажется, не применяется к возвращенному Rx.Observable, но ко всей функции myObservable.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...