Я изучаю Java и смог выполнить многопоточность с моими существующими приложениями, используя runnable.Теперь я смотрел на прерыватель (чтобы разделить переменные между потоками), но я не могу понять, как автор на самом деле порождает потоки.
Я вижу, что он использует Executor, который я использую для отправки исполняемых классов вв моей программе, но в этом примере нет отправки (или выполнения).Я узнал только то, что знаю из руководств по Oracle, и в них упоминается, что есть только два способа - расширять потоки или реализовывать работоспособный (я не вижу и здесь, но он отправляет executor в disruptor, что, возможно, как он создает потоки?).Я что-то упускаю или этот человек делает это по-другому?Моя конечная цель - понять этот код (который работает отлично), чтобы я мог применить его к своему существующему (с помощью запускаемого) кода.
Вот код, о котором идет речь:
Приложение.java
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class App {
private final static int RING_SIZE = 1024 * 8;
private static long handleCount = 0;
private final static long ITERATIONS = 1000L * 1000L * 300L;
private final static int NUM_EVENT_PROCESSORS = 8;
private final static EventHandler<ValueEvent> handler =
new EventHandler<ValueEvent>() {
public void onEvent(final ValueEvent event,
final long sequence,
final boolean endOfBatch) throws Exception {
handleCount++;
}
};
public static void main(String[] args) {
System.out.println("Starting disruptor app.");
ExecutorService executor = Executors.newFixedThreadPool(NUM_EVENT_PROCESSORS);
Disruptor<ValueEvent> disruptor =
new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, executor,
new SingleThreadedClaimStrategy(RING_SIZE),
new SleepingWaitStrategy());
disruptor.handleEventsWith(handler);
RingBuffer<ValueEvent> ringBuffer = disruptor.start();
long start = System.currentTimeMillis();
long sequence;
ValueEvent event;
for (long x=0; x<ITERATIONS; x++) {
sequence = ringBuffer.next();
event = ringBuffer.get(sequence);
event.setValue(x);
ringBuffer.publish(sequence);
}
final long expectedSequence = ringBuffer.getCursor();
while (handleCount < expectedSequence) { }
long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
System.out.printf("op/s: %d, handled: %d", opsPerSecond, handleCount);
}
}
Обновление: если Disruptor обрабатывает порождение потоков, то как я могу передать ему свой существующий исполняемый класс?или мне нужно заново переработать код?Извините, я немного сбит с толку, если нарушитель будет работать с существующим кодом или если мне нужно полностью изменить мои вещи для него.