Отложенная система блокировки для гарантированного заказа - PullRequest
0 голосов
/ 30 июня 2018

Есть ли в Java встроенный механизм блокировки, при котором каждый «берет число» (устанавливает будущий порядок блокировки), выполняет некоторую работу и более поздние блоки до тех пор, пока «сейчас не отслужит» читает наш номер заявки?

Вот обзор псевдокода того, что я имею в виду:

public CompletionStage<Void> onEvent()
{
  return CompletableFuture.runAsync(() ->
  {
    // Establish future locking order without acquiring a lock
    int ticket = takeTicket();
    // Execute network request asynchronously
    return CompletableFuture.runAsync(() -> networkRequest(), threadPool).
      thenCompose(() ->
      {
        // Trying acquiring a lock. If the locking system is
        // waiting for someone with a smaller number, block
        // until our turn comes up.
        lock(ticket);
        // ... do work that depends on strict event ordering ...
        return CompletableFuture.runAsync(() ->
        {
          fireMoreEvents();
          // Permanently discard the lock, allowing the next user to lock.
          release(ticket);
        }, eventDispatcher);
      });
  }, eventDispatcher);
}
  • Номера билетов должны обрабатываться в последовательном порядке.
  • Если пользователи ожидают блокировки, а пользователь, владеющий текущим билетом, не блокирует и не разблокирует в течение timeout миллисекунд, сгенерируйте исключение (чтобы указать, что с моей стороны есть какая-то программная ошибка).
  • Попытка повторно использовать просроченный билет (тот, который уже использовался или истек срок действия) выдает исключение

Если это не встроено в Java, есть ли хороший способ реализовать это без пробуждения всех заблокированных потоков при каждом увеличении обслуживаемого числа?

Обоснование для механизма блокировки

У меня есть прослушиватель событий, который должен записывать события в порядке их поступления, но он должен записывать связанную информацию из асинхронного источника (например, сети) вместе с данными.

1 Ответ

0 голосов
/ 01 июля 2018

Насколько я понимаю, у нас есть ряд параллельных (основных) задач, и у каждой из них есть связанная задача очистки, которая должна быть запущена после выполнения основной задачи.

Мы хотим наложить порядок на задачи очистки; а также мы не хотим, чтобы любые два потока одновременно выполняли свои соответствующие разделы очистки.

Поскольку мы хотим защитить от одновременных очисток, я думаю, что имеет смысл иметь единый поток для выполнения всех наших задач очистки (это будет служба очистки).

Приведенное ниже решение основано на поставщике билетов, который в основном создает билеты и помещает их в очередь очистки в том же порядке, в котором они были созданы. Нить очистки постоянно ждет «текущего» билета; и как только билет готов к очистке, выполняется очистка и обрабатывается следующий билет.

Все билеты заказаны (по порядку их вставки в службу очистки; также у них есть номер).

Для каждого тикета / задачи мы можем использовать CountDownLatch, чтобы любой рабочий поток мог сообщить о завершении основной задачи потоку очистки (используя тикет).

См. Примерное решение ниже,

package stackOver;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class SequentialCleanup {
  // it is (obviously) important that this executor has one thread only
  ExecutorService executorCleanup = Executors.newFixedThreadPool(1);
  ExecutorService workerEs = Executors.newFixedThreadPool(20);

  public static void main(String[] args) throws Exception {
    SequentialCleanup o = new SequentialCleanup();
    o.go();
  }

  private void go() throws InterruptedException, ExecutionException {
    TicketProvider tp = new TicketProvider(executorCleanup);
    EventListener el = new EventListener(tp, workerEs);

    for (int i=0; i<20;i++) {
      el.onEvent();
    }
    Thread.sleep(10_000L);
    executorCleanup.shutdown();
    workerEs.shutdown();
  }
}

class EventListener {
  private TicketProvider tp;
  private ExecutorService workers;
  public EventListener(TicketProvider tp, ExecutorService workers) {
    this.tp = tp;
    this.workers = workers;
  }
  public CompletionStage<Void> onEvent() {
    Ticket ticket = tp.takeTicket();
    return runAsyncCode().thenRun(
        () -> {
          // on finish, have the cleanup service run our cleanup
          ticket.onWorkerFinish( ()-> {
            // put cleanup code here
            System.out.println("cleanups are orderer by ticket="+ticket);
          });
    });
  }
  private CompletionStage<Void> runAsyncCode() {
    CompletableFuture<Void> res = new CompletableFuture<>();
    workers.submit(
        ()-> {
          System.out.println("doing some work..");
          try { Thread.sleep(1000+(long)(Math.random()*1000)); } catch (Exception e) { }
          System.out.println("done");
          res.complete(null);
        }
        );
    return res;
  }
}

class Ticket {
  private int number;
  private CountDownLatch workerHasFinished = new CountDownLatch(1);
  private volatile Runnable cleanup;

  public Ticket(int number) {
    this.number = number;
  }
  // after the worker has finished the main task, it calls onWorkerFinish()
  public void onWorkerFinish(Runnable cleanup) {
    this.cleanup = cleanup;
    workerHasFinished.countDown();
  }

  // awaits end of the job, then cleans up
  public void waitThenCleanup() {
    try {
      if (workerHasFinished.await(2000L, TimeUnit.MILLISECONDS)) {
        System.out.println("cleanup ticket num=" + number);
        cleanup.run();
        System.out.println("end cleanup num=" + number);
      }
      else {
        System.out.println("cleanup skipped for ticket=" + number + ", time elapsed");
      }
    } catch (Exception e) {
    }
  }
  @Override
  public String toString() {
    return "Ticket["+number+"]";
  }
}

class TicketProvider {
  int ticketCounter = 0;
  private ExecutorService esCleanup;

  public TicketProvider(ExecutorService es) {
    this.esCleanup = es;
  }

  public synchronized Ticket takeTicket() {
    System.out.println("returning ticket " + ticketCounter);
    Ticket ticket = new Ticket(ticketCounter++);
    // enqueue for the cleanup
    esCleanup.submit(ticket::waitThenCleanup);
    return ticket;
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...