Как застегнуть наблюдаемые с повторением более короткой последовательности - PullRequest
0 голосов
/ 08 января 2020

Я пытаюсь выяснить, как добиться следующего результата:

A: -a--b--c-d--e-f-|
B: --1-2-3-|
=: --a-b--c-d--e-f-|
 : --1-2--3-1--2-3

, где A, B - входные потоки, а '=' - выходной поток (как кортеж A, B).

Плюс наоборот:

A: -a-b-|
B: --1--2-3-4--5--6-7-|
=: --a--b-a-b--a--b-a-|
 : --1--2-3-4--5--6-7

Итак, в простом тексте - я ищу что-то, что ведет себя как zip-оператор, но с возможностью «воспроизведения» более коротких последовательностей в соответствует более длинному.

Есть идеи, как решить эту проблему?


Решение 1

Решение, предоставленное @ DanielT (с некоторыми проблемы)

extension ObservableType {
    public static func zipRepeat<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A, B)> {
        return Observable.create { observer in
            var aa: [A] = []
            var aComplete = false
            var bb: [B] = []
            var bComplete = false
            let lock = NSRecursiveLock()
            let disposableA = a.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next(let ae):
                    aa.append(ae)
                    if bComplete {
                        observer.onNext((ae, bb[(aa.count - 1) % bb.count]))
                    }
                    else if bb.count == aa.count {
                        observer.onNext((aa.last!, bb.last!))
                    }
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    aComplete = true
                    if bComplete {
                        observer.onCompleted()
                    }
                }
            }
            let disposableB = b.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next(let be):
                    bb.append(be)
                    if aComplete {
                        observer.onNext((aa[(bb.count - 1) % aa.count], be))
                    }
                    else if bb.count == aa.count {
                        observer.onNext((aa.last!, bb.last!))
                    }
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    bComplete = true
                    if aComplete {
                        observer.onCompleted()
                    }
                }
            }
            return Disposables.create(disposableA, disposableB)
        }
    }
}

Решение 2

Мое собственное решение, основанное на ответах ниже - собственный оператор (HT @DanielT), но с более настоятельным подходом (HT @iamtimmo):

extension ObservableType {
    public static func zipRepeatCollected<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A?, B?)> {
        return Observable.create { observer in

            var bufferA: [A] = []
            let aComplete = PublishSubject<Bool>()
            aComplete.onNext(false);

            var bufferB: [B] = []
            let bComplete = PublishSubject<Bool>()
            bComplete.onNext(false);

            let disposableA = a.subscribe { event in
                switch event {
                case .next(let valueA):
                    bufferA.append(valueA)
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    aComplete.onNext(true)
                    aComplete.onCompleted()
                }
            }

            let disposableB = b.subscribe { event in
                switch event {
                case .next(let value):
                    bufferB.append(value)
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    bComplete.onNext(true)
                    bComplete.onCompleted()
                }
            }

            let disposableZip = Observable.zip(aComplete, bComplete)
                .filter { $0 == $1 && $0 == true }
                .subscribe { event in
                    switch event {
                    case .next(_, _):
                        var zippedList = Array<(A?, B?)>()

                        let lengthA = bufferA.count
                        let lengthB = bufferB.count

                        if lengthA > 0 && lengthB > 0 {
                            for i in 0 ..< max(lengthA, lengthB) {
                                let aVal = bufferA[i % lengthA]
                                let bVal = bufferB[i % lengthB]
                                zippedList.append((aVal, bVal))
                            }
                        } else if lengthA == 0 {
                            zippedList = bufferB.map { (nil, $0) }
                        } else {
                            zippedList = bufferA.map { ($0, nil) }
                        }

                        zippedList.forEach { observer.onNext($0) }
                    case .completed:
                        observer.onCompleted()
                    case .error(let e):
                        observer.onError(e)
                    }
            }

            return Disposables.create(disposableA, disposableB, disposableZip)
        }
    }
}

class ZipRepeatTests: XCTestCase {
    func testALongerThanB() {
        assertAopBEqualsE(
            a: "-a--b--c-d--e-f-|",
            b: "--1-2-3-|",
            e: "a1,b2,c3,d1,e2,f3,|")
    }

    func testAShorterThanB() {
        assertAopBEqualsE(
            a: "-a--b|",
            b: "--1-2-3-|",
            e: "a1,b2,a3,|")
    }
    func testBStartsLater() {
        assertAopBEqualsE(
            a: "-a--b|",
            b: "----1---2|",
            e: "a1,b2,|")

    }
    func testABWithConstOffset() {
        assertAopBEqualsE(
            a: "-a--b--c|",
            b: "----1--2--3--|",
            e: "a1,b2,c3,|")
    }

    func testAEndsBeforeBStarts() {
        assertAopBEqualsE(
            a: "ab|",
            b: "---1-2-3-4-|",
            e: "a1,b2,a3,b4,|")
    }

    func testACompletesWithoutValue() {
        assertAopBEqualsE(
            a: "-|",
            b: "--1-2-3-|",
            e: "1,2,3,|")
    }
    func testBCompletesWithoutValue() {
        assertAopBEqualsE(
            a: "-a--b|",
            b: "|",
            e: "a,b,|")
    }
    func testNoData() {
        assertAopBEqualsE(
            a: "-|",
            b: "|",
            e: "|")
    }

    func assertAopBEqualsE(_ scheduler: TestScheduler = TestScheduler(initialClock: 0), a: String, b: String, e: String, file: StaticString = #file, line: UInt = #line) {

        let aStream = scheduler.createColdObservable(events(a))
        let bStream = scheduler.createColdObservable(events(b))
        let eStream = expected(e)

        let bResults = scheduler.start {
            Observable<(String)>.zipRepeatCollected(aStream.asObservable(), bStream.asObservable()).map { "\($0 ?? "")\($1 ?? "")" }
        }
        XCTAssertEqual(eStream, bResults.events.map { $0.value }, file: file, line: line)
    }
    func expected(_ stream: String) -> [Event<String>] {
        stream.split(separator: ",").map { String($0) == "|" ? .completed : .next(String($0)) }
    }
    func events(_ stream: String, step: Int = 10) -> [Recorded<Event<String>>] {
        var time = 0
        var events = [Recorded<Event<String>>]()
        stream.forEach { c in
            if c == "|" {
                events.append(.completed(time))
            } else if c != "-" {
                events.append(.next(time, String(c)))
            }
            time += step
        }
        return events
    }
}

Ответы [ 3 ]

1 голос
/ 09 января 2020
  • Swift 5.1 / Combine *

Hi @ DegeH,

Я не знаю RxSwift, но вот кое-что, что может вам помочь. Он использует новый фреймворк Combine, но вы можете подключиться к RxSwift. Просто вставьте его на игровую площадку.

В этом примере pubA и pubB собирают строки из потоков A и B соответственно и генерируют список этих строк. Когда оба потока завершены, publisher объединяет два списка (через zip()) в список.

Списки разделяются на части и передаются в функцию zipUnalignedPublishersLists, которая преобразует их в один список кортежей. Именно здесь значения из более короткого потока повторяются, чтобы выровнять значения более длинного. Поэтому, возможно, эта функция поможет вам больше всего.

Наконец, этот список кортежей flatMap передан издателю последовательности, на который подписан subscriber.

DispatchQueue.main.asyncAfter() вызов в конце - это просто гарантия того, что выполнение не закончится до того, как subscriber успеет завершить на игровой площадке Вы не хотите это в своем заявлении.

import Foundation
import Combine

func zipUnalignedPublishersLists(_ listA: [String], _ listB: [String]) -> [(String, String)] {
    var zippedList = Array<(String, String)>()

    let lengthA = listA.count
    let lengthB = listB.count
    for i in 0 ..< max(lengthA, lengthB) {
        let aVal = listA[i % lengthA]
        let bVal = listB[i % lengthB]
        zippedList.append( (aVal, bVal) )
    }

    return zippedList
}

let ptsA = PassthroughSubject<String, Never>()
let ptsB = PassthroughSubject<String, Never>()

let pubA = ptsA.collect().eraseToAnyPublisher()
let pubB = ptsB.collect().eraseToAnyPublisher()

let publisher = pubA
    .zip(pubB)
    .map({ listA, listB in
        zipUnalignedPublishersLists(listA, listB)
    })
    .flatMap { $0.publisher }
    .eraseToAnyPublisher()

let subscriber = publisher.sink(receiveValue: { print($0) })

ptsA.send("a")
ptsB.send("1")
ptsA.send("b")
ptsB.send("2")
ptsA.send("c")
ptsB.send("3")
ptsB.send(completion: .finished)
ptsA.send("d")
ptsA.send("e")
ptsA.send("f")
ptsA.send(completion: .finished)

DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) {}

1 голос
/ 09 января 2020

Если вы сомневаетесь, вы всегда можете сделать свой собственный оператор:

extension ObservableType {
    public static func zipRepeat<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A, B)> {
        return Observable.create { observer in
            var aa: [A] = []
            var aComplete = false
            var bb: [B] = []
            var bComplete = false
            let lock = NSRecursiveLock()
            let disposableA = a.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next(let ae):
                    aa.append(ae)
                    if bComplete {
                        observer.onNext((ae, bb[(aa.count - 1) % bb.count]))
                    }
                    else if bb.count == aa.count {
                        observer.onNext((aa.last!, bb.last!))
                    }
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    aComplete = true
                    if bComplete {
                        observer.onCompleted()
                    }
                }
            }
            let disposableB = b.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next(let be):
                    bb.append(be)
                    if aComplete {
                        observer.onNext((aa[(bb.count - 1) % aa.count], be))
                    }
                    else if bb.count == aa.count {
                        observer.onNext((aa.last!, bb.last!))
                    }
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    bComplete = true
                    if aComplete {
                        observer.onCompleted()
                    }
                }
            }
            return Disposables.create(disposableA, disposableB)
        }
    }
}

И тест, показывающий функциональность:

class RxSandboxTests: XCTestCase {

    func testLongA() {
        let scheduler = TestScheduler(initialClock: 0)
        let a = scheduler.createColdObservable([.next(10, "a"), .next(20, "b"), .next(30, "c"), .next(40, "d"), .next(50, "e"), .next(60, "f"), .completed(60)])
        let b = scheduler.createColdObservable([.next(10, 1), .next(20, 2), .next(30, 3), .completed(30)])

        let bResults = scheduler.start {
            Observable<(String, Int)>.zipRepeat(a.asObservable(), b.asObservable()).map { $0.1 }
        }

        XCTAssertEqual(bResults.events, [.next(210, 1), .next(220, 2), .next(230, 3), .next(240, 1), .next(250, 2), .next(260, 3), .completed(260)])
    }

    func testLongB() {
        let scheduler = TestScheduler(initialClock: 0)
        let a = scheduler.createColdObservable([.next(10, "a"), .next(20, "b"), .next(30, "c"), .completed(30)])
        let b = scheduler.createColdObservable([.next(10, 1), .next(20, 2), .next(30, 3), .next(40, 4), .next(50, 5), .next(60, 6), .completed(60)])

        let aResults = scheduler.start {
            Observable<(String, Int)>.zipRepeat(a.asObservable(), b.asObservable()).map { $0.0 }
        }

        XCTAssertEqual(aResults.events, [.next(210, "a"), .next(220, "b"), .next(230, "c"), .next(240, "a"), .next(250, "b"), .next(260, "c"), .completed(260)])
    }
}
0 голосов
/ 17 марта 2020

Здесь играют две ортогональные компоненты:

  1. Учитывая конечный список элементов, преобразуйте их в конечную последовательность циклических элементов. Это происходит довольно часто, поэтому я реализовал CycleSequence тип , который оборачивает массив элементов и распределяет их элементы по порядку, повторяя навсегда.
  2. Способ объединения эта бесконечно циклическая последовательность с наблюдаемыми потоковыми элементами, связывающая каждый новый элемент с соответствующим членом из циклической последовательности.

С концептуальной точки зрения важно понимать, что циклическая последовательность не является наблюдаемый. Нам нужно действовать в «притягивающем» духе. Когда мы хотим следующий элемент в циклической последовательности, мы спрашиваем его (вызывая его реализацию IteratorProtocol.next()). Это не похоже на Observable s, которые имеют свою собственную хронологию, и элементы "pu sh" через свою цепочку операторов.

Таким образом, это вопрос map ping каждого элемента наблюдаемого с элементом next() из циклической последовательности, например:

var cyclingIterator = CycleSequence(cycling: ["a", "b", "c"]).makeIterator()

Observable.from(1...100)
    .asObservable()
    .map { ($0, cyclingIterator.next()) }
    .subscribe(onNext: { print($0) })

печатает:

(1, Optional("?"))
(2, Optional("?"))
(3, Optional("?"))
(4, Optional("?"))
(5, Optional("?"))
(6, Optional("?"))
(7, Optional("?"))
(8, Optional("?"))
(9, Optional("?"))
(10, Optional("?"))
...
...