RxSwift - Наблюдаемые очереди различного типа от Singleton - PullRequest
0 голосов
/ 17 мая 2018

У меня есть класс Singleton (на самом деле он собирает очистку веб-страниц, но упрощенно здесь) с некоторыми общедоступными функциями. Все функции возвращают Single<T>, но разных типов.

Это может выглядеть так:

class Singleton {
    static let shared = Singleton()
    private init() { }

    func doSomethingInt() -> Single<Int> {
        return Single.just(1)
            .delay(3, scheduler: MainScheduler.instance)
    }

    func doSomethingString() -> Single<String> {
        return Single.just("Wow")
            .delay(3, scheduler: MainScheduler.instance)
    }
}

Когда кто-то вызывает Singleton.shared.doSomthingInt(), функция должна быть помещена в очередь, а не выполняться до тех пор, пока она не пройдет через очередь. Следующее наблюдаемое в очереди не должно начинаться до того, как все завершится. В идеале Singleton будет иметь функцию, которая будет задерживать выполнение каждой функции, которая передается ему. Примерно так:

private func placeInQueue<T: Any>(operation: Single<T>) -> Single<T> {
    // place in some magic shared queue
    return operation
}

И тогда я мог бы просто связать эту функцию в начале функций, которые должны быть помещены в очередь, например:

func doSomethingString() -> Single<String> {
    let operation = Single.just("Wow")
        .delay(3, scheduler: MainScheduler.instance)
    return placeInQueue(operation)
}

Я чувствую, что это возможно как-то с помощью операции concat, но я пока не смог ее решить.

Есть какие-нибудь подсказки?

1 Ответ

0 голосов
/ 17 мая 2018

Я создал этот класс, который, кажется, работает:)

Возможно, это проблема с тем, на каком планировщике он работает.По крайней мере, если я добавлю некоторые операции в очередь, каждая с задержкой в ​​3 секунды на MainScheduler, я могу видеть, что некоторые из операций с очередями завершаются более или менее через 3,5 ~ 4 секунды после предыдущей операции.завершено.Это не такая уж большая проблема для меня:)

class ObservableQueue {
    init() { }

    private var queueArray = [(operation: Observable<Void>, id: Double)]()

    /// Adding the `operation` to an internal queue. Starts execution of the `operation` when all previous operations in the queue had sendt an stop event.
    func placeInQueue<T: Any>(_ operation: Single<T>) -> Single<T> {
        let operationId = createId()
        let queuedOperation = currentQueue()
            .flatMap { _ -> Single<T> in
                return operation
            }
            .do(
                onNext: { [weak self] _ in self?.removeFromQueue(id: operationId) },
                onError: { [weak self] _ in self?.removeFromQueue(id: operationId)
            })
        let queueableOperation = operation
            .map { _ in return () }
            .asObservable()
            .catchErrorJustReturn(())
        addToQueue(queueableOperation, id: operationId)
        return queuedOperation
    }

    private func createId() -> Double {
        var operationId: Double = Date().timeIntervalSince1970
        while (queueArray.contains { $0.id == operationId }) {
            operationId = Date().timeIntervalSince1970
        }
        return operationId
    }

    private func currentQueue() -> Single<Void> {
        var queue = queueArray.map{ $0.operation }
        if queue.isEmpty {
            queue = [Observable.just(())]
        }
        return Observable.concat(queue).takeLast(1).asSingle()
    }

    private func addToQueue(_ operation: Observable<Void>, id: Double) {
        queueArray.append((operation: operation, id: id))
    }

    private func removeFromQueue(id: Double) {
        guard let index = (queueArray.index { $0.id == id }) else { return }
        queueArray.remove(at: index)
    }
}

Вот как я это использую:

private let queue = ObservableQueue()

func doSomethingInt() -> Single<Int> {
    let operation = Single.just(1)
        .delay(3, scheduler: MainScheduler.instance)
    return queue.placeInQueue(operation)
}

Надеюсь, это кому-нибудь поможет :) Пожалуйста, не стесняйтесь сообщать об этомрешение или лучшее решение, если у вас есть какие-либо ..

...