Чередовать несколько потоков - PullRequest
0 голосов
/ 02 октября 2019

У меня есть список потоков List[Stream[_]], размер списка известен в начале функции, размер каждого потока равен n или n+1. Я хотел бы получить поток чередования, например

def myMagicFold[A](s: List[Stream[A]]): Stream[A]

val streams = List(Stream(1,1,1),Stream(2,2,2),Stream(3,3),Stream(4,4)) 

val result = myMagicFold(streams)

//result = Stream(1,2,3,4,1,2,3,4,1,2)

Я использую fs2.Stream. Мой первый дубль:

val result = streams.fold(fs2.Stream.empty){
   case (s1, s2) => s1.interleaveAll(s2)
}

// result = Stream(1, 4, 3, 4, 2, 3, 1, 2, 1, 2)

Я ищу решение, основанное на основных операциях (map, fold, ...)

Ответы [ 2 ]

3 голосов
/ 02 октября 2019

Ваше первоначальное предположение было хорошим, однако interleaveAll выравнивается слишком рано, поэтому вы не получите ожидаемый заказ. Вот код, который должен делать то, что вы пытаетесь достичь:


  def zipAll[F[_], A](streams: List[Stream[F, A]]): Stream[F, A] =
    streams
      .foldLeft[Stream[F, List[Option[A]]]](Stream.empty) { (acc, s) =>
        zipStreams(acc, s)
      }
      .flatMap(l => Stream.emits(l.reverse.flatten))

  def zipStreams[F[_], A](s1: Stream[F, List[Option[A]]], s2: Stream[F, A]): Stream[F, List[Option[A]]] =
    s1.zipAllWith(s2.map(Option(_)))(Nil, Option.empty[A]) { case (acc, a) => a :: acc }

В этом случае вы добавляете n-й элемент каждого потока в список, а затем конвертируете в Stream, который позжесплющен к потоку результата. Так как fs2.Stream основан на пулле, у вас есть только один список в памяти за раз.

1 голос
/ 02 октября 2019

Вот попытка, она работает, как и ожидалось ...

import cats.effect.IO
import cats.implicits._
import fs2.Stream

def myMagicFold[A](streams: List[Stream[IO, A]]): Stream[IO, A] =
  Stream.unfoldEval(streams) { streams =>
    streams.traverse { stream =>
      stream.head.compile.last
    } map { list =>
      list.sequence.map { chunk =>
        Stream.emits(chunk) -> list.map(_.tail)
      }
    }
  }.flatten

Однако это далеко не хорошее решение, оно крайне неэффективно, поскольку переоценивает каждый Поток на каждом шаге.
Вы можете подтвердить это с помощью этого кода:

def stream(name: String, n: Int, value: Int): Stream[IO, Int] =
  Stream
    .range(start = 0, stopExclusive = n)
    .evalMap { i =>
      IO {
        println(s"${name} - ${i}")
        value
      }
    }

val list = List(stream("A", 3, 1), stream("B", 2, 2), stream("C", 3, 3))
myMagicFold(list).compile.toList.unsafeRunAsync(println)

, который напечатает

A - 0
B - 0
C -0
A - 0
A - 1
B - 0
B - 1
C - 0
C - 1
A - 0
A - 1
A - 2
B - 0
B - 1
C - 0
C - 1
C - 2

Справа (Список (1, 2, 3), 1, 2, 3))

Я почти уверен, что это можно исправить с помощью Pulls , но у меня нет никакого опыта с этим.

...