У меня есть куча объектов, представляющих некоторые данные.Эти объекты могут быть записаны в соответствующие им файлы.Пользователь может потребовать внесения некоторых изменений быстрее, чем предыдущие изменения, записанные в файл.
Скажем, я внесу изменения в Файл A, Файл B и Файл C и отправлю их на исполнение.Затем, пока они пишутся, я делаю изменения в файле A и публикую его.Например, работают 3 потока.Как только будут выполнены первые изменения A, B и C (записаны в файлы), 1-е и 2-е изменения A будут выполнены почти одновременно.Однако я хочу, чтобы 2-е изменение было применено после того, как 1-е сделано.
Как я могу сделать это в rxJava?
Еще один момент.В другом месте я хочу запустить действие с последними изменениями.Один из вариантов - подождать, пока все задачи не будут завершены.
Есть ли подходящий примитив / подход RxJava, который, как мы надеемся, охватил бы эти 2 варианта использования?
Я новичок в RxJava, но надеюсь, что это имеет смысл,Subjects
мне кажется уместным, но там будут сотни файлов.
У меня уже есть реализация, использующая пользовательские Executor
.
public class OrderingExecutor
implements Executor
{
@Delegate
private final Executor delegate;
private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<>();
public OrderingExecutor(
Executor delegate)
{
this.delegate = delegate;
}
public void execute(
Runnable task,
Object key)
{
Objects.requireNonNull(key);
boolean first;
Runnable wrappedTask;
synchronized (keyedTasks)
{
Queue<Runnable> dependencyQueue = keyedTasks.get(key);
first = (dependencyQueue == null);
if (dependencyQueue == null)
{
dependencyQueue = new LinkedList<>();
keyedTasks.put(key, dependencyQueue);
}
wrappedTask = wrap(task, dependencyQueue, key);
if (!first)
{
dependencyQueue.add(wrappedTask);
}
}
// execute method can block, call it outside synchronize block
if (first)
{
delegate.execute(wrappedTask);
}
}
private Runnable wrap(
Runnable task,
Queue<Runnable> dependencyQueue,
Object key)
{
return new OrderedTask(task, dependencyQueue, key);
}
class OrderedTask
implements Runnable
{
private final Queue<Runnable> dependencyQueue;
private final Runnable task;
private final Object key;
public OrderedTask(
Runnable task,
Queue<Runnable> dependencyQueue,
Object key)
{
this.task = task;
this.dependencyQueue = dependencyQueue;
this.key = key;
}
@Override
public void run()
{
try
{
task.run();
}
finally
{
Runnable nextTask = null;
synchronized (keyedTasks)
{
if (dependencyQueue.isEmpty())
{
keyedTasks.remove(key);
}
else
{
nextTask = dependencyQueue.poll();
}
}
if (nextTask != null)
{
delegate.execute(nextTask);
}
}
}
}
}
Может быть, какой-то разумный способ подключитьэто в rxJava?