Я реализую страничный сплитератор (на Java), который должен разрешать параллельный доступ.
У меня есть следующий тестовый пример (тест в Groovy со Споком):
def 'parallel, two pages'()
{
when: 'a sorted range from 0 to 6'
def fetcher = new IntegerRangePageFetcher(6)
and: 'a spliterator with a page size of 5'
def spliterator = new PagedSpliterator(fetcher, 5)
and: 'a stream with the given range is collected to a list'
def result = StreamSupport
.stream(spliterator, true)
.collect(Collectors.toList())
then: 'the sort order is obeyed'
expect result, contains(0, 1, 2, 3, 4, 5)
}
Этот тестовый случай завершается с ошибкой:
Condition not satisfied:
expect result, contains(0, 1, 2, 3, 4, 5)
| |
false [5, 0, 1, 2, 3, 4]
Expected: iterable containing [<0>, <1>, <2>, <3>, <4>, <5>]
but: item 0: was <5>
Сплитератор имеет characteristics()
return IMMUTABLE | ORDERED | SIZED | SUBSIZED | NONNULL;
Код работает, когда я не использую параллель. Так что я не понимаю ORDERED
:
- Если он установлен, должна ли потоковая структура гарантировать порядок и должна сортировать результаты при использовании параллельно генерируемых кусков? Если да, то почему не сортировать это в моем случае?
- Или у меня есть ошибка в моей реализации
trySplit
, и мне нужно выполнить разбиение в соответствии с заданным порядком? (в настоящее время я разделяю середину открытых страниц, 0-mid остается у текущего сплитератора, средний конец переходит во вновь созданный сплитератор)
- Или мне следует позвонить
sort()
до collect()
, потому что фреймворк вообще не гарантирует какой-либо порядок?
--- Обновление на основе обратной связи ---
Спасибо за ваши ответы, в моем коде есть две логические ошибки. Сначала запрашиваемые фрагменты:
@Override
public Spliterator<T> trySplit()
{
// first query
if (pageIterator == null) {
pageIterator = pageFetcher.fetchNextPage(paginationInfo);
}
// delegate split decision
var newPaginationInfo = paginationInfo.split();
if (newPaginationInfo == null) {
log.info("* Spliterator returns null");
return null;
}
// now we split
var newSpliterator = new PagedSpliterator<>(pageFetcher, newPaginationInfo);
return newSpliterator;
}
public PaginationInfo split()
{
// when open range or nothing left we don't split
if ((endElementIndex == -1) || !hasNextPage()) {
return null;
}
// calculate the splitting position
var firstHalfPages = (getEndPageIndex() - getNextPageIndex()) / 2;
var midElementIndex = (getNextPageIndex() + firstHalfPages) * pageSize;
// create an additional PaginationInfo and set the ranges according to the split position
var newPaginationInfo = new PaginationInfo(this);
newPaginationInfo.firstElementOnNextPageIndex = midElementIndex;
newPaginationInfo.nextElementIndex = midElementIndex;
endElementIndex = midElementIndex;
return newPaginationInfo;
}
Первая ошибка:
Вновь созданный Spliterator устанавливается во второй половине диапазона вместо первого. Я прочитал о префиксе в документации, но мне это кажется очень неуклюжим. Я разделил на размер страницы, чтобы иметь несколько параллельных запросов. В начале (первый экземпляр spliterator) мне нужно выбрать первую страницу, чтобы получить счетчик страниц и элементов. Поэтому, чтобы исправить проблему с заказом, я должен раздать извлеченные данные из первого сплитератора второму сплитератору, чтобы выполнить заказ, это кажется мне очень странным и не интуитивным.
Вторая ошибка:
// first query
if (pageIterator == null) {
pageIterator = pageFetcher.fetchNextPage(paginationInfo);
}
Все последующие созданные сплитераторы получат вызовы estimateSize()
и trySplit()
от платформы. Во время этих вызовов в настоящее время я выбираю страницу, но это блокирует параллелизм, выборка должна произойти позже в вызове tryAdvance()
.
Я внесу эти изменения и вернусь к вам.