Ниже приведен код, который это делает. Это как-то неловко, а может, и неправильно. Я могу представить себе состояние гонки, при котором сетевой запрос может начинаться и завершаться в другом потоке до того, как sub будет установлено значение, отличное от nil.
Опасность! Swift.Set
не является потокобезопасным. Если вы хотите получить доступ к Set
из двух разных потоков, вам нужно сериализовать доступы, чтобы они не перекрывались.
Что возможно в целом (хотя, возможно, не с URLSession.DataTaskPublisher
) заключается в том, что издатель излучает свои сигналы синхронно, прежде чем оператор sink
даже вернется. Так ведут себя Just
, Result.Publisher
, Publishers.Sequence
и другие. Таким образом, они создают проблему, которую вы описываете, без использования потоковой безопасности.
Теперь, как решить проблему? Если вы не думаете, что действительно хотите иметь возможность отменить подписку, вы можете вообще избежать создания AnyCancellable
, используя Subscribers.Sink
вместо оператора sink
:
URLSession.shared.dataTaskPublisher(for: request)
.map(\.data)
.decode(type: MyResponse.self, decoder: JSONDecoder())
.subscribe(Subscribers.Sink(
receiveCompletion: { completion in ... },
receiveValue: { response in ... }
))
Combine очистит подписку и подписчика после завершения подписки (с помощью .finished
или .failure
).
Но что, если вы делаете хотите иметь возможность отменить подписку ? Может быть, иногда ваш SomeThing
уничтожается до того, как подписка будет завершена, и в этом случае вам не нужна подписка для завершения. Затем вы хотите создать AnyCancellable
и сохранить его в свойстве экземпляра, чтобы он отменялся при уничтожении SomeThing
.
В этом случае установите флаг, указывающий, что приемник выиграл гонку , и проверьте флаг перед сохранением AnyCancellable
.
var sub: AnyCancellable? = nil
var isComplete = false
sub = URLSession.shared.dataTaskPublisher(for: request)
.map(\.data)
.decode(type: MyResponse.self, decoder: JSONDecoder())
// This ensures thread safety, if the subscription is also created
// on DispatchQueue.main.
.receive(on: DispatchQueue.main)
.sink(
receiveCompletion: { [weak self] completion in
isComplete = true
if let theSub = sub {
self?.subs.remove(theSub)
}
},
receiveValue: { response in ... }
}
if !isComplete {
subs.insert(sub!)
}