Метрики из нескольких потоков - PullRequest
3 голосов
/ 18 мая 2011

Так что это похоже на довольно распространенный вариант использования, и, возможно, я слишком много думаю об этом, но у меня возникла проблема с сохранением централизованных метрик из нескольких потоков. Скажем, у меня есть несколько рабочих потоков, которые обрабатывают все записи, и каждые 1000 записей я хочу показать какой-то показатель. Теперь я мог бы записывать в каждый поток отдельные метрики, но потом получать числа пропускной способности, но мне пришлось бы добавлять их вручную (и, конечно, временные границы не будут точными). Вот простые примеры:

public class Worker implements Runnable {

   private static int count = 0;
   private static long processingTime = 0;

   public void run() {
       while (true) {
          ...get record
          count++;
          long start = System.currentTimeMillis();
          ...do work
          long end = System.currentTimeMillis();
          processingTime += (end-start);
          if (count % 1000 == 0) {
              ... log some metrics
              processingTime = 0;
              count = 0;
          }
       }
    }
}

Надеюсь, в этом есть смысл. Также я знаю, что две статические переменные, вероятно, будут AtomicInteger и AtomicLong. , , но может и нет. Интересует, какие у людей есть идеи. Я думал об использовании атомарных переменных и об использовании ReeantrantReadWriteLock - но я действительно не хочу, чтобы метрики останавливали поток обработки (т. Е. Метрики должны оказывать очень минимальное влияние на обработку). Спасибо.

Ответы [ 3 ]

3 голосов
/ 18 мая 2011

Выгрузка фактической обработки в другой поток может быть хорошей идеей.Идея состоит в том, чтобы инкапсулировать ваши данные и быстро передать их потоку обработки, чтобы свести к минимуму влияние на потоки, выполняющие значимую работу.

Существует небольшая конкуренция за передачу обслуживания, но эта стоимость обычно намного меньшечем любой другой тип синхронизации, он должен быть хорошим кандидатом во многих ситуациях.Я думаю, что решение М. Джессапа довольно близко к моему, но, надеюсь, следующий код ясно иллюстрирует это.

public class Worker implements Runnable {

   private static final Metrics metrics = new Metrics();

   public void run() {
      while (true) {
        ...get record
        long start = System.currentTimeMillis();
        ...do work
        long end = System.currentTimeMillis();
        // process the metric asynchronously
        metrics.addMetric(end - start);
     }
  }

  private static final class Metrics {
     // a single "background" thread that actually handles
     // processing
     private final ExecutorService metricThread = 
           Executors.newSingleThreadExecutor();
     // data (no synchronization needed)
     private int count = 0;
     private long processingTime = 0;

     public void addMetric(final long time) {
        metricThread.execute(new Runnable() {
           public void run() {
              count++;
              processingTime += time;
              if (count % 1000 == 0) {
                 ... log some metrics
                 processingTime = 0;
                 count = 0;
              }
           }
        });
      }
   }
}
2 голосов
/ 18 мая 2011

Я бы посоветовал, если вы не хотите, чтобы ведение журнала мешало обработке, у вас должен быть отдельный рабочий поток журнала, и ваши потоки обработки просто предоставляют некоторый тип объекта значения, который можно передать.В этом примере я выбираю LinkedBlockingQueue, поскольку он имеет возможность блокировать на незначительное время с помощью offer (), и вы можете отложить блокировку до другого потока, который извлекает значения из очереди.Возможно, вам понадобится увеличить логику в MetricProcessor для упорядочения данных и т. Д. В зависимости от ваших требований, но даже если это длительная операция, она не помешает планировщику потоков ВМ в то же время перезапустить реальные потоки обработки.

public class Worker implements Runnable {

  public void run() {
    while (true) {
      ... do some stuff
      if (count % 1000 == 0) {
        ... log some metrics
        if(MetricProcessor.getInstance().addMetrics(
            new Metrics(processingTime, count, ...)) {
          processingTime = 0;
          count = 0;
        } else {
          //the call would have blocked for a more significant
          //amount of time, here the results
          //could be abandoned or just held and attempted again
          //as a larger data set later
        }
      }
    }
  }
}

public class WorkerMetrics {
  ...some interesting data
  public WorkerMetrics(... data){
    ...
  }
  ...getter setters etc
}

public class MetricProcessor implements Runnable {
  LinkedBlockingQueue metrics = new LinkedBlockingQueue();
  public boolean addMetrics(WorkerMetrics m) {
    return metrics.offer(m); //This may block, but not for a significant amount of time.
  }

  public void run() {
    while(true) {
      WorkMetrics m = metrics.take(); //wait here for something to come in
      //the above call does all the significant blocking without
      //interrupting the real processing
      ...do some actual logging, aggregation, etc of the metrics
    }
  }
}
1 голос
/ 18 мая 2011

Если вы зависите от состояния подсчета и состояния processingTime для синхронизации, то вам придется использовать блокировку.Например, если значение ++count % 1000 == 0 равно true, вы хотите оценить метрики processingTime в то время.

Для этого случая имеет смысл использовать ReentrantLock.Я бы не использовал RRWL, потому что на самом деле не существует случая, когда происходит чистое чтение.Это всегда набор для чтения / записи.Но вам нужно будет заблокировать все

  count++
  processingTime += (end-start);
  if (count % 1000 == 0) {
      ... log some metrics
      processingTime = 0;
      count = 0;
  }

Независимо от того, будет ли count ++ находиться в этом месте, вам нужно будет также обойти это.Наконец, если вы используете блокировку, вам не нужны AtomicLong и AtomicInteger.Это только увеличивает накладные расходы и не является более поточно-ориентированным.

...