Дарт: Как создать поток, который объединяет события из другого потока? - PullRequest
0 голосов
/ 19 апреля 2020

Каков наилучший способ создания потока, который должен объединять несколько событий из другого потока?

Моя цель - создать поток, который агрегирует события из другого потока, пока у него не будет достаточно событий для создания сообщения. В моем случае я читаю данные из потока Socket, поэтому сообщение может быть распределено по разным событиям, а событие может содержать данные для разных сообщений, поэтому я не могу просто применить операцию map к каждому элементу.

Кажется, правильным будет использовать потоковый трансформатор, но у меня возникают проблемы с поиском информации о том, как правильно его реализовать и без лишнего шаблонного кода.

Я придумал решение после прочтения о том, как создавать потоки, но я не уверен, является ли это приемлемым или лучшим способом сделать это.

Вот мой пример решения:

Stream<String> joinWordsIfStartWithC(Stream<String> a) async* {
  var prevWord= '';
  await for (var i in a) {
    prevWord += i;
    if(i.startsWith('C')){
      yield prevWord;
      prevWord = '';
    }
  }
}

Stream<String> periodicStream(Duration interval) async* {
  while (true) {
    await Future.delayed(interval);
    yield 'C';
    yield 'A';
    yield 'B';
    yield 'C';
    yield 'C';
    yield 'B';
    yield 'C';
  }
}

void main(List<String> arguments) async {
  var intStream = periodicStream(Duration(seconds: 2));

  var sStream = joinWordsIfStartWithC(intStream);

  sStream.listen((s) => print(s));
}

1 Ответ

0 голосов
/ 21 апреля 2020

Я скажу, что ваше решение кажется хорошим, но если вы хотите создать потоковый преобразователь, его достаточно просто расширить с StreamTransformerBase:

import 'dart:async';

class JoinWordsIfStartWithCTransformer extends StreamTransformerBase<String, String> {
  Stream<String> bind(Stream<String> a) async* {
    var prevWord = '';
    await for (var i in a) {
      prevWord += i;
      if (i.startsWith('C')) {
        yield prevWord;
        prevWord = '';
      }
    }
  }
}

Stream<String> periodicStream(Duration interval) async* {
  while (true) {
    await Future.delayed(interval);
    yield 'C';
    yield 'A';
    yield 'B';
    yield 'C';
    yield 'C';
    yield 'B';
    yield 'C';
  }
}

void main(List<String> arguments) async {
  var intStream = periodicStream(Duration(seconds: 2));

  var sStream = intStream.transform(JoinWordsIfStartWithCTransformer());

  sStream.listen((s) => print(s));
}
...