publishSubject Синхронизация предупреждение об аномалии - PullRequest
0 голосов
/ 28 января 2019

Я играл с RxSwift на детской площадке, и я столкнулся с предупреждением.Вот полное предупреждающее сообщение:

Synchronization anomaly was detected.
- Debugging: To debug this issue you can set a breakpoint in RxSwift/RxSwift/Rx.swift:113 and observe the call stack.
Problem: This behavior is breaking the observable sequence grammar. `next (error | completed)?`
- This behavior breaks the grammar because there is overlapping between sequence events.
Observable sequence is trying to send an event before sending of previous event has finished.
- Interpretation: Two different unsynchronized threads are trying to send some event simultaneously.
This is undefined behavior because the ordering of the effects caused by these events is nondeterministic and depends on the 
operating system thread scheduler. This will result in a random behavior of your program.
- Remedy: If this is the expected behavior this message can be suppressed by adding `.observeOn(MainScheduler.asyncInstance)`
or by synchronizing sequence events in some other way.

Вот код на игровой площадке.

import RxSwift
import Foundation

example("PublishSubject") {
let disposeBag = DisposeBag()
let subject = PublishSubject<String>()


subject.onNext("?")
subject.subscribe(onNext: { (value) in
    print(value)
}).disposed(by: disposeBag)
subject.onNext("?")

subject.onNext("?️")
subject.onNext("?️")
DispatchQueue.global(qos: .utility).async {
    for index in 0...10 {
        subject.onNext("1")
    }
          subject.observeOn(MainScheduler.asyncInstance).subscribe(onNext: { (value) in
        print(value)
    }).disposed(by: disposeBag)
}
DispatchQueue.global(qos: .utility).async {
    for index in 0...10 {
        subject.onNext("B")
    }
    subject.observeOn(MainScheduler.asyncInstance).subscribe(onNext: { (value) in
        print(value)
    }).disposed(by: disposeBag)
}
}

Как я могу решить эту проблему?ТНХ

1 Ответ

0 голосов
/ 28 января 2019

Вы отправляете событие по вашей теме, когда оно находится в процессе обработки события.Это нарушает контракт, который должны поддерживать субъекты.

В частности, субъекты не имеют какого-либо отслеживания потоков, поэтому вы должны сделать это вручную.Наиболее очевидный способ - установить рекурсивную блокировку вокруг ваших вызовов onNext, чтобы они не перекрывались при работе в отдельных потоках.

let disposeBag = DisposeBag()
let subject = PublishSubject<String>()
let lock = NSRecursiveLock()

subject.onNext("?")
subject.subscribe(onNext: { (value) in
    print(value)
}).disposed(by: disposeBag)
subject.onNext("?")

subject.onNext("?️")
subject.onNext("?️")
DispatchQueue.global(qos: .utility).async {
    for index in 0...10 {
        lock.lock()
        subject.onNext("1")
        lock.unlock()
    }
    subject.observeOn(MainScheduler.asyncInstance).subscribe(onNext: { (value) in
        print(value)
    }).disposed(by: disposeBag)
}
DispatchQueue.global(qos: .utility).async {
    for index in 0...10 {
        lock.lock()
        subject.onNext("B")
        lock.unlock()
    }
    subject.observeOn(MainScheduler.asyncInstance).subscribe(onNext: { (value) in
        print(value)
    }).disposed(by: disposeBag)
}
...