rxJava Упорядоченное (по ключу) выполнение задачи - PullRequest
0 голосов
/ 04 февраля 2019

У меня есть куча объектов, представляющих некоторые данные.Эти объекты могут быть записаны в соответствующие им файлы.Пользователь может потребовать внесения некоторых изменений быстрее, чем предыдущие изменения, записанные в файл.

Скажем, я внесу изменения в Файл A, Файл B и Файл C и отправлю их на исполнение.Затем, пока они пишутся, я делаю изменения в файле A и публикую его.Например, работают 3 потока.Как только будут выполнены первые изменения A, B и C (записаны в файлы), 1-е и 2-е изменения A будут выполнены почти одновременно.Однако я хочу, чтобы 2-е изменение было применено после того, как 1-е сделано.

Как я могу сделать это в rxJava?

Еще один момент.В другом месте я хочу запустить действие с последними изменениями.Один из вариантов - подождать, пока все задачи не будут завершены.

Есть ли подходящий примитив / подход RxJava, который, как мы надеемся, охватил бы эти 2 варианта использования?

Я новичок в RxJava, но надеюсь, что это имеет смысл,Subjects мне кажется уместным, но там будут сотни файлов.

У меня уже есть реализация, использующая пользовательские Executor.

public class OrderingExecutor
implements Executor
{
    @Delegate
    private final Executor delegate;
    private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<>();

    public OrderingExecutor(
        Executor delegate)
    {
        this.delegate = delegate;
    }

    public void execute(
        Runnable task,
        Object key)
    {
        Objects.requireNonNull(key);

        boolean first;
        Runnable wrappedTask;
        synchronized (keyedTasks)
        {
            Queue<Runnable> dependencyQueue = keyedTasks.get(key);
            first = (dependencyQueue == null);
            if (dependencyQueue == null)
            {
                dependencyQueue = new LinkedList<>();
                keyedTasks.put(key, dependencyQueue);
            }

            wrappedTask = wrap(task, dependencyQueue, key);
            if (!first)
            {
                dependencyQueue.add(wrappedTask);
            }
        }

        // execute method can block, call it outside synchronize block
        if (first)
        {
            delegate.execute(wrappedTask);
        }

    }

    private Runnable wrap(
        Runnable task,
        Queue<Runnable> dependencyQueue,
        Object key)
    {
        return new OrderedTask(task, dependencyQueue, key);
    }

    class OrderedTask
    implements Runnable
    {

        private final Queue<Runnable> dependencyQueue;
        private final Runnable task;
        private final Object key;

        public OrderedTask(
            Runnable task,
            Queue<Runnable> dependencyQueue,
            Object key)
        {
            this.task = task;
            this.dependencyQueue = dependencyQueue;
            this.key = key;
        }

        @Override
        public void run()
        {
            try
            {
                task.run();
            }
            finally
            {
                Runnable nextTask = null;
                synchronized (keyedTasks)
                {
                    if (dependencyQueue.isEmpty())
                    {
                        keyedTasks.remove(key);
                    }
                    else
                    {
                        nextTask = dependencyQueue.poll();
                    }
                }
                if (nextTask != null)
                {
                    delegate.execute(nextTask);
                }
            }
        }
    }
}

Может быть, какой-то разумный способ подключитьэто в rxJava?

1 Ответ

0 голосов
/ 04 февраля 2019

Не совсем понятно, чего вы пытаетесь достичь, но вы можете наложить приоритетную очередь поверх RxJava.

class OrderedTask implements Comparable<OrderedTask> { ... }

PriorityBlockingQueue<OrderedTask> queue = new PriorityBlockingQueue<>();

PublishSubject<Integer> trigger = PublishSubject.create();

trigger.flatMap(v -> {
   OrderedTask t = queue.poll();
   return someAPI.workWith(t);
}, 1)
.subscribe(result -> { }, error -> { });

queue.offer(new SomeOrderedTask(1));
trigger.onNext(1);

queue.offer(new SomeOrderedTask(2));
trigger.onNext(2);
...