Я пытаюсь выяснить, как добиться следующего результата:
A: -a--b--c-d--e-f-|
B: --1-2-3-|
=: --a-b--c-d--e-f-|
: --1-2--3-1--2-3
, где A, B - входные потоки, а '=' - выходной поток (как кортеж A, B).
Плюс наоборот:
A: -a-b-|
B: --1--2-3-4--5--6-7-|
=: --a--b-a-b--a--b-a-|
: --1--2-3-4--5--6-7
Итак, в простом тексте - я ищу что-то, что ведет себя как zip-оператор, но с возможностью «воспроизведения» более коротких последовательностей в соответствует более длинному.
Есть идеи, как решить эту проблему?
Решение 1
Решение, предоставленное @ DanielT (с некоторыми проблемы)
extension ObservableType {
public static func zipRepeat<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A, B)> {
return Observable.create { observer in
var aa: [A] = []
var aComplete = false
var bb: [B] = []
var bComplete = false
let lock = NSRecursiveLock()
let disposableA = a.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next(let ae):
aa.append(ae)
if bComplete {
observer.onNext((ae, bb[(aa.count - 1) % bb.count]))
}
else if bb.count == aa.count {
observer.onNext((aa.last!, bb.last!))
}
case .error(let error):
observer.onError(error)
case .completed:
aComplete = true
if bComplete {
observer.onCompleted()
}
}
}
let disposableB = b.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next(let be):
bb.append(be)
if aComplete {
observer.onNext((aa[(bb.count - 1) % aa.count], be))
}
else if bb.count == aa.count {
observer.onNext((aa.last!, bb.last!))
}
case .error(let error):
observer.onError(error)
case .completed:
bComplete = true
if aComplete {
observer.onCompleted()
}
}
}
return Disposables.create(disposableA, disposableB)
}
}
}
Решение 2
Мое собственное решение, основанное на ответах ниже - собственный оператор (HT @DanielT), но с более настоятельным подходом (HT @iamtimmo):
extension ObservableType {
public static func zipRepeatCollected<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A?, B?)> {
return Observable.create { observer in
var bufferA: [A] = []
let aComplete = PublishSubject<Bool>()
aComplete.onNext(false);
var bufferB: [B] = []
let bComplete = PublishSubject<Bool>()
bComplete.onNext(false);
let disposableA = a.subscribe { event in
switch event {
case .next(let valueA):
bufferA.append(valueA)
case .error(let error):
observer.onError(error)
case .completed:
aComplete.onNext(true)
aComplete.onCompleted()
}
}
let disposableB = b.subscribe { event in
switch event {
case .next(let value):
bufferB.append(value)
case .error(let error):
observer.onError(error)
case .completed:
bComplete.onNext(true)
bComplete.onCompleted()
}
}
let disposableZip = Observable.zip(aComplete, bComplete)
.filter { $0 == $1 && $0 == true }
.subscribe { event in
switch event {
case .next(_, _):
var zippedList = Array<(A?, B?)>()
let lengthA = bufferA.count
let lengthB = bufferB.count
if lengthA > 0 && lengthB > 0 {
for i in 0 ..< max(lengthA, lengthB) {
let aVal = bufferA[i % lengthA]
let bVal = bufferB[i % lengthB]
zippedList.append((aVal, bVal))
}
} else if lengthA == 0 {
zippedList = bufferB.map { (nil, $0) }
} else {
zippedList = bufferA.map { ($0, nil) }
}
zippedList.forEach { observer.onNext($0) }
case .completed:
observer.onCompleted()
case .error(let e):
observer.onError(e)
}
}
return Disposables.create(disposableA, disposableB, disposableZip)
}
}
}
class ZipRepeatTests: XCTestCase {
func testALongerThanB() {
assertAopBEqualsE(
a: "-a--b--c-d--e-f-|",
b: "--1-2-3-|",
e: "a1,b2,c3,d1,e2,f3,|")
}
func testAShorterThanB() {
assertAopBEqualsE(
a: "-a--b|",
b: "--1-2-3-|",
e: "a1,b2,a3,|")
}
func testBStartsLater() {
assertAopBEqualsE(
a: "-a--b|",
b: "----1---2|",
e: "a1,b2,|")
}
func testABWithConstOffset() {
assertAopBEqualsE(
a: "-a--b--c|",
b: "----1--2--3--|",
e: "a1,b2,c3,|")
}
func testAEndsBeforeBStarts() {
assertAopBEqualsE(
a: "ab|",
b: "---1-2-3-4-|",
e: "a1,b2,a3,b4,|")
}
func testACompletesWithoutValue() {
assertAopBEqualsE(
a: "-|",
b: "--1-2-3-|",
e: "1,2,3,|")
}
func testBCompletesWithoutValue() {
assertAopBEqualsE(
a: "-a--b|",
b: "|",
e: "a,b,|")
}
func testNoData() {
assertAopBEqualsE(
a: "-|",
b: "|",
e: "|")
}
func assertAopBEqualsE(_ scheduler: TestScheduler = TestScheduler(initialClock: 0), a: String, b: String, e: String, file: StaticString = #file, line: UInt = #line) {
let aStream = scheduler.createColdObservable(events(a))
let bStream = scheduler.createColdObservable(events(b))
let eStream = expected(e)
let bResults = scheduler.start {
Observable<(String)>.zipRepeatCollected(aStream.asObservable(), bStream.asObservable()).map { "\($0 ?? "")\($1 ?? "")" }
}
XCTAssertEqual(eStream, bResults.events.map { $0.value }, file: file, line: line)
}
func expected(_ stream: String) -> [Event<String>] {
stream.split(separator: ",").map { String($0) == "|" ? .completed : .next(String($0)) }
}
func events(_ stream: String, step: Int = 10) -> [Recorded<Event<String>>] {
var time = 0
var events = [Recorded<Event<String>>]()
stream.forEach { c in
if c == "|" {
events.append(.completed(time))
} else if c != "-" {
events.append(.next(time, String(c)))
}
time += step
}
return events
}
}