Java всегда предоставляла механизм для поддержания состояния и продолжения выполнения в более поздний момент времени: потоки. Основная идея моего библиотечного решения состоит в том, чтобы позволить ConcurrentIterable
создать элементов в одном потоке, а ConcurrentIterator
потреблять их в другом, связываясь через ограниченную очередь. (Это обычно называется шаблоном производитель / потребитель.)
Во-первых, вот демонстрация упрощенного использования:
public class CustomCollection<T> extends ConcurrentIterable<T>
{
private T[] data;
private int size;
@Override
protected void provideElements()
{
for (int i = 0; i < size; ++i)
{
provideElement(data[i]);
}
}
// ...
}
Обратите внимание на полное отсутствие конечных автоматов. Все, что вам нужно сделать, это извлечь из ConcurrentIterable
и реализовать метод provideElements
. Внутри этого метода вы пишете простой код, который вызывает provideElement
для каждого элемента в коллекции.
Иногда клиент не выполняет итерацию по всей коллекции, например, при линейном поиске. Вы можете прекратить предоставлять элементы, как только обнаружен аборт, проверив iterationAborted()
:
@Override
protected void provideElements()
{
for (int i = 0; i < size && !iterationAborted(); ++i)
{
provideElement(data[i]);
}
}
Совершенно нормально не проверять iterationAborted()
, если вы не заботитесь о генерируемых дополнительных элементах. Для бесконечных последовательностей проверка iterationAborted()
обязательна.
Как производитель может определить, что потребитель прекратил итерации? Это реализуется путем наличия сильной ссылки на токен у потребителя и слабой ссылки на тот же токен у производителя. Когда потребитель прекращает итерацию, токен становится пригодным для сбора мусора, и в конечном итоге он становится невидимым для производителя. С этого момента все новые элементы будут просто отбрасываться.
(Без этой меры предосторожности при определенных обстоятельствах ограниченная очередь может в конечном итоге заполниться, производитель войдет в бесконечный цикл, и содержащиеся элементы никогда не будут собираться мусором.)
А теперь о деталях реализации:
ConcurrentIterable.java
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public abstract class ConcurrentIterable<T> implements Iterable<T>
{
private static final int CAP = 1000;
private final ThreadLocal<CommunicationChannel<T>> channels
= new ThreadLocal<CommunicationChannel<T>>();
@Override
public Iterator<T> iterator()
{
BlockingQueue<Option<T>> queue = new ArrayBlockingQueue<Option<T>>(CAP);
Object token = new Object();
final CommunicationChannel<T> channel
= new CommunicationChannel<T>(queue, token);
new Thread(new Runnable()
{
@Override
public void run()
{
channels.set(channel);
provideElements();
enqueueSentinel();
}
}).start();
return new ConcurrentIterator<T>(queue, token);
}
protected abstract void provideElements();
protected final boolean iterationAborted()
{
return channels.get().iterationAborted();
}
protected final void provideElement(T element)
{
enqueue(Option.some(element));
}
private void enqueueSentinel()
{
enqueue(Option.<T> none());
}
private void enqueue(Option<T> element)
{
try
{
while (!offer(element))
{
System.gc();
}
}
catch (InterruptedException ignore)
{
ignore.printStackTrace();
}
}
private boolean offer(Option<T> element) throws InterruptedException
{
CommunicationChannel<T> channel = channels.get();
return channel.iterationAborted()
|| channel.queue.offer(element, 1, TimeUnit.SECONDS);
}
}
CommunicationChannel.java
import java.lang.ref.WeakReference;
import java.util.concurrent.BlockingQueue;
public class CommunicationChannel<T>
{
public final BlockingQueue<Option<T>> queue;
private final WeakReference<Object> token;
public CommunicationChannel(BlockingQueue<Option<T>> queue, Object token)
{
this.queue = queue;
this.token = new WeakReference<Object>(token);
}
public boolean iterationAborted()
{
return token.get() == null;
}
}
ConcurrentIterator.java
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
public class ConcurrentIterator<T> implements Iterator<T>
{
private final BlockingQueue<Option<T>> queue;
@SuppressWarnings("unused")
private final Object token;
private Option<T> next;
public ConcurrentIterator(BlockingQueue<Option<T>> queue, Object token)
{
this.queue = queue;
this.token = token;
}
@Override
public boolean hasNext()
{
if (next == null)
{
try
{
next = queue.take();
}
catch (InterruptedException ignore)
{
ignore.printStackTrace();
}
}
return next.present;
}
@Override
public T next()
{
if (!hasNext()) throw new NoSuchElementException();
T result = next.value;
next = null;
return result;
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
}
Option.java
public class Option<T>
{
public final T value;
public final boolean present;
private Option(T value, boolean present)
{
this.value = value;
this.present = present;
}
public static <T> Option<T> some(T value)
{
return new Option<T>(value, true);
}
@SuppressWarnings("unchecked")
public static <T> Option<T> none()
{
return none;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private static final Option none = new Option(null, false);
}
Дайте мне знать, что вы думаете!