Как использовать Dask с пользовательскими классами - PullRequest
1 голос
/ 09 ноября 2019

Я строю распределенный пространственный индекс в Python с расширением C ++. Я пытаюсь использовать Dask (впервые) вместо MPI. Текущая реализация работает, но имеет несколько проблем.

  1. Критическая часть - большие издержки по сравнению с последовательным выполнением. Я ожидал бы почти линейного ускорения от алгоритма.

  2. По-видимому, Persist не выполняет то, что я ожидаю, поскольку время первого запроса намного больше, чем во втором.

  3. для моих глаз код выглядит недиоматичным, но, как уже говорилось, я новичок в Dask. Есть ли лучший способ сделать это?

  4. Мне нужно сделать некоторую индексацию в отложенных объектах при использовании 2d чанков. Это не происходит с 1d кусками и выглядит странно.

Алгоритм создает список сортировки частиц и затем строит октодерево, где узлы ссылаются на непрерывные блоки частиц в отсортированном массиве с помощьюпара (начало, конец) индексов. Запросы занимают граничную рамку и ищут перекрывающиеся узлы в октодереве, собирая частицы, находящиеся в граничной рамке, из полученных кандидатов. И построение, и запросы являются чисто последовательными.

Распараллеливание выполняется путем случайного разделения частиц, построения дерева в каждом подмножестве. Запросы транслируются по всем подиндексам, а результаты объединяются. Даск чувствовал себя здесь как натуральный. Я использую persist для генерации индекса один раз и сохраняю его, поскольку ожидаю много запросов на индекс.

Я пробовал map_blocks, но, похоже, это работает только для преобразований массива в массив. Кроме того, были опробованы различные варианты перестановки / вычисления.

from dask.delayed import delayed
from dask.distributed import LocalCluster, Client
import dask.array as da

# My custom octree index
from pyoctree import Index

# For reference, this runs on a 10 Core x 8 Thread cluster (POWER8)
cluster = LocalCluster()
client = Client()

# Point count
N = 100_000_000
# Dimensionality
D = 3
# Chunk Count
K = 10
# Octree Levels
L = 5

# Delayed adapters for Dask
@delayed
def index(xs):
    """Create index"""
    return Index(L, xs)

@delayed
def query(index, ll, ur):
    """Bounding box query between lower left and upper right"""
    return index.query(*ll, *ur)

# Random input: N random points in 3d
array   = da.random.random((N, 3), chunks=(N//K,3)).astype(np.float32)
# Split array
splits  = array.to_delayed() 
# Create index
indices = [index(split[0]).persist() for split in splits]
# This takes roughly 100ms

# Execute a query
queries = [query(index, (0.25, 0.25, 0.25), (0.35, 0.35, 0.35)) for index in indices]
# Merge results
parts   = [da.from_delayed(query, shape=(np.nan,3), dtype=np.float32) for query in queries]
parts   = [part.compute_chunk_sizes() for part in parts]
final   = da.concatenate(parts)
result  = final.compute()
# This takes 100s the first time and 800ms afterwards

Последовательное выполнение с одинаковыми параметрами занимает 240 секунд для построения индекса и 6 мс для одного запроса. С Dask это 100-секундное строительство и 800 мс на запрос. Увеличивая число частиц N в 10 раз, я получаю 11 с на запрос и 240 с, чтобы построить индекс. Таким образом, кажется, что Даск добавляет суровое наказание, которое масштабируется в N.

Я благодарен за любые подсказки.

С уважением

1 Ответ

0 голосов
/ 09 ноября 2019

Трудно сказать без профилирования, но я предполагаю, что вы включаете свой пространственный индекс в каждую задачу, и поэтому вам мешает чрезмерная сериализация.

Я рекомендую прочитать этот документ, чтобы узнать, какЧтобы определить затраты: https://docs.dask.org/en/latest/phases-of-computation.html

И этот документ, чтобы узнать о том, как избежать сериализации больших объектов: https://docs.dask.org/en/latest/delayed-best-practices.html#avoid-repeatedly-putting-large-inputs-into-delayed-calls

...