AtomicReference для изменяемого объекта и видимости - PullRequest
8 голосов
/ 21 февраля 2012

Скажите, у меня есть AtomicReference к списку объектов:

AtomicReference<List<?>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>());

Тема A добавляет элементы в этот список: batch.get().add(o);

Позже, поток B берет список и, например, сохраняет его в БД: insertBatch(batch.get());

Нужно ли выполнять дополнительную синхронизацию при записи (Поток A) и чтении (Поток B)чтобы убедиться, что поток B видит список так, как его покинул A, или AtomicReference позаботился об этом?

Другими словами: если у меня есть AtomicReference для изменяемого объекта, и один поток изменяет этот объект,другие потоки видят это изменение немедленно?

Редактировать:

Может быть, приведен пример кода:

public void process(Reader in) throws IOException {
    List<Future<AtomicReference<List<Object>>>> tasks = new ArrayList<Future<AtomicReference<List<Object>>>>();
    ExecutorService exec = Executors.newFixedThreadPool(4);

    for (int i = 0; i < 4; ++i) {
        tasks.add(exec.submit(new Callable<AtomicReference<List<Object>>>() {
            @Override public AtomicReference<List<Object>> call() throws IOException {

                final AtomicReference<List<Object>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>(batchSize));

                Processor.this.parser.parse(in, new Parser.Handler() {
                    @Override public void onNewObject(Object event) {
                            batch.get().add(event);

                            if (batch.get().size() >= batchSize) {
                                dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));
                            }
                    }
                });

                return batch;
            }
        }));
    }

    List<Object> remainingBatches = new ArrayList<Object>();

    for (Future<AtomicReference<List<Object>>> task : tasks) {
        try {
            AtomicReference<List<Object>> remainingBatch = task.get();
            remainingBatches.addAll(remainingBatch.get());
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();

            if (cause instanceof IOException) {
                throw (IOException)cause;
            }

            throw (RuntimeException)cause;
        }
    }

    // these haven't been flushed yet by the worker threads
    if (!remainingBatches.isEmpty()) {
        dao.insertBatch(remainingBatches);
    }
}

Здесь происходит то, что я создаю четыре рабочих потока дляразбирать текст (это параметр Reader in для метода process()).Каждый работник сохраняет строки, которые он проанализировал в пакете, и сбрасывает пакет, когда он заполнен (dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));).

Поскольку число строк в тексте не кратно размеру пакета,последние объекты заканчиваются в пакете, который не очищен, так как он не полон.Поэтому эти оставшиеся партии вставляются основным потоком.

Я использую AtomicReference.getAndSet(), чтобы заменить полную партию пустой.Правильно ли эта программа в отношении потоков?

Ответы [ 4 ]

10 голосов
/ 21 февраля 2012

Хм ... на самом деле это не так. AtomicReference гарантирует, что сама ссылка видима в потоках, т. Е. Если вы назначите ей ссылку, отличную от исходной, обновление будет видно. Он не дает никаких гарантий относительно фактического содержимого объекта, на который указывает ссылка.

Поэтому операции чтения / записи содержимого списка требуют отдельной синхронизации.

Редактировать : Таким образом, исходя из обновленного кода и опубликованного комментария, для обеспечения видимости достаточно установить локальную ссылку на volatile.

1 голос
/ 14 августа 2012

Я думаю, что, забыв весь код здесь, вы задаете следующий вопрос:

Нужно ли выполнять дополнительную синхронизацию при записи (поток A) и чтение (поток B), чтобы убедиться, что поток B видит список так, как его покинул A, или AtomicReference позаботился об этом?

Итак, точный ответ на это: ДА , атомная забота о видимости. И это не мое мнение, а документация JDK :

Эффекты памяти для обращений и обновлений атомных элементов обычно следуют правилам для летучих, как указано в Спецификации языка Java, третье издание (17.4 Модель памяти).

Надеюсь, это поможет.

0 голосов
/ 21 февраля 2012

AtomicReference поможет вам только со ссылкой на список, он не будет ничего делать с самим списком. В частности, в вашем сценарии вы почти наверняка столкнетесь с проблемами, когда система находится под нагрузкой, когда потребитель взял список, а производитель добавляет в него элемент.

Этот звук для меня, как будто вы должны использовать BlockingQueue. Затем вы можете ограничить объем памяти, если ваш производитель работает быстрее, чем ваш потребитель, и позволить очереди обрабатывать все конфликты.

Что-то вроде:

ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (50);

// ... Producer
queue.put(o);

// ... Consumer
List<Object> queueContents = new ArrayList<Object> ();
// Grab everything waiting in the queue in one chunk. Should never be more than 50 items.
queue.drainTo(queueContents);

Добавлена ​​

Спасибо @Tudor за указание на используемую вами архитектуру. ... Должен признать, это довольно странно. На самом деле вам не нужен AtomicReference, насколько я вижу. Каждый поток владеет своим собственным ArrayList до тех пор, пока не будет передан в dao, после чего он будет заменен, так что нигде не будет никаких конфликтов.

Я немного обеспокоен тем, что вы создаете четыре парсера для одного Reader. Надеюсь, у вас есть какой-то способ убедиться, что каждый синтаксический анализатор не влияет на другие.

Лично я бы использовал некоторую форму шаблона производитель-потребитель, как я описал в коде выше. Возможно, что-то вроде этого.

static final int PROCESSES = 4;
static final int batchSize = 10;

public void process(Reader in) throws IOException, InterruptedException {

  final List<Future<Void>> tasks = new ArrayList<Future<Void>>();
  ExecutorService exec = Executors.newFixedThreadPool(PROCESSES);
  // Queue of objects.
  final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (batchSize * 2);
  // The final object to post.
  final Object FINISHED = new Object();

  // Start the producers.
  for (int i = 0; i < PROCESSES; i++) {
    tasks.add(exec.submit(new Callable<Void>() {
      @Override
      public Void call() throws IOException {

        Processor.this.parser.parse(in, new Parser.Handler() {
          @Override
          public void onNewObject(Object event) {
            queue.add(event);
          }
        });
        // Post a finished down the queue.
        queue.add(FINISHED);
        return null;
      }
    }));
  }

  // Start the consumer.
  tasks.add(exec.submit(new Callable<Void>() {
    @Override
    public Void call() throws IOException {
      List<Object> batch = new ArrayList<Object>(batchSize);
      int finishedCount = 0;
      // Until all threads finished.
      while ( finishedCount < PROCESSES ) {
        Object o = queue.take();
        if ( o != FINISHED ) {
          // Batch them up.
          batch.add(o);
          if ( batch.size() >= batchSize ) {
            dao.insertBatch(batch);
            // If insertBatch takes a copy we could merely clear it.
            batch = new ArrayList<Object>(batchSize);
          }
        } else {
          // Count the finishes.
          finishedCount += 1;
        }
      }
      // Finished! Post any incopmplete batch.
      if ( batch.size() > 0 ) {
        dao.insertBatch(batch);
      }
      return null;
    }
  }));

  // Wait for everything to finish.
  exec.shutdown();
  // Wait until all is done.
  boolean finished = false;
  do {
    try {
      // Wait up to 1 second for termination.
      finished = exec.awaitTermination(1, TimeUnit.SECONDS);
    } catch (InterruptedException ex) {
    }
  } while (!finished);
}
0 голосов
/ 21 февраля 2012

Добавление к ответу Tudor : вам придется сделать сам поток ArrayList поточно-ориентированным или - в зависимости от ваших требований - еще большие блоки кода.

Если вы можете избежать проблем с потоком ArrayList, вы можете "украсить" его так:

batch = java.util.Collections.synchronizedList(new ArrayList<Object>());

Но имейте в виду: даже такие "простые" конструкции, как это, not threadsafe с этим:

Object o = batch.get(batch.size()-1);
...