Не получает входные данные при использовании `.receive (on: DispatchQueue.main)` - PullRequest
0 голосов
/ 20 июня 2019

Я пытаюсь перейти на основной поток в нисходящем потоке с помощью .receive(on: DispatchQueue.main), но тогда я не получаю входные данные при использовании .subscribe(:) или .sink(receiveValue:). Если я не меняю темы, я получаю соответствующие данные.

Издатель

extension URLSessionWebSocketTask {
  struct ReceivePublisher: Publisher {
    typealias Output = Message
    typealias Failure = Error

    let task: URLSessionWebSocketTask

    func receive<S>(subscriber: S) where S: Subscriber, Output == S.Input, Failure == S.Failure {
      task.receive { result in
        switch result {
        case .success(let message): _ = subscriber.receive(message)
        case .failure(let error): subscriber.receive(completion: .failure(error))
        }
      }
    }
  }
}

extension URLSessionWebSocketTask {
  func receivePublisher() -> ReceivePublisher {
    ReceivePublisher(task: self)
  }
}

Абонентская

extension ViewModel: Subscriber {
  typealias Input = URLSessionWebSocketTask.Message
  typealias Failure = Error

  func receive(subscription: Subscription) {}

  func receive(_ input: URLSessionWebSocketTask.Message) -> Subscribers.Demand {
    // Handle input here.
    // When using `.receive(on:)` this method is not called when should be.
    return .unlimited
  }

  func receive(completion: Subscribers.Completion<Error>) {}
}

Подписаться

socketTask.receivePublisher()
      .receive(on: DispatchQueue.main)
      .subscribe(viewModel)
socketTask.resume()

1 Ответ

1 голос
/ 30 июня 2019

AnyCancellable, возвращаемое subscribe<S>(_ subject: S) -> AnyCancellable, вызовет cancel(), когда оно будет деинициализировано. Поэтому, если вы не сохраните его, он будет деинициализирован, когда вызывающий блок выйдет из области видимости.

Из видео и учебных пособий, которые я видел в WWDC, о том, как с этим работать, никогда не обращались. Что я видел, так это то, что люди все больше склоняются к решению DisposeBag от RxSwift.

Обновление бета-версии 4: Combine теперь поставляется с методом на AnyCancellable, который называется: store(in:), который выполняет почти то же, что и мое старое решение. Вы можете просто хранить AnyCancellable s в наборе AnyCancellable:

var cancellables = Set<AnyCancellable>()
...
override func viewDidLoad() {
    super.viewDidLoad()
    ...
    socketTask.receivePublisher()
        .receive(on: DispatchQueue.main)
        .subscribe(viewModel)
        .store(in: &cancellables)
}

Таким образом, массив (и все AnyCancellable s) будет деинициализирован при деинициализации содержащего класса.

Устаревшие

Если вам нужно решение для всех Cancellable с, которое можно использовать таким образом, чтобы оно работало лучше, вы можете расширить Cancellable следующим образом:

extension Cancellable {

    func cancel(with cancellables: inout [AnyCancellable]) {
        if let cancellable = self as? AnyCancellable {
            cancellables.append(cancellable)
        } else {
            cancellables.append(AnyCancellable(self))
        }
    }

}

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...