dask: разделяемая память в параллельной модели - PullRequest
0 голосов
/ 17 ноября 2018

Я прочитал документацию по дневникам, блоги и ТАК, но я все еще не до конца уверен, как это сделать. Мой вариант использования:

  • У меня около 10 ГБ справочных данных. После загрузки они доступны только для чтения. Обычно мы загружаем их в рамки данных Dask / Pandas
  • Мне нужны эти ref-данные для обработки (обогащения, изменения, преобразования) около 500 миллионов событий в день (несколько файлов)
  • «Процесс» представляет собой конвейер из около 40 задач. Последовательность выполнения является релевантной (зависимости).
  • Каждая отдельная задача не сложна и не занимает много времени, в основном поиск, обогащение, сопоставление и т. Д.
  • Нет никаких зависимостей между событиями. Теоретически я могу обработать каждое событие отдельным потоком, объединить вывод в один файл, и все готово. Выходные события даже не должны быть в том же порядке, что и входные события.

В итоге:

  • мы можем массово парализовать обработку событий
  • Каждому параллельному потоку требуются одинаковые 10 ГБ (необработанных) ref-данных
  • Обработка отдельного события означает применение к ним последовательности / конвейера из 40 задач
  • Каждая отдельная задача не отнимает много времени (прочитайте ref-данные и измените событие)

Возможные подводные камни / проблемы:

  • тратить больше времени на сериализацию / десериализацию, а не на обработку данных (мы испытывали это в некоторых наших испытаниях, в которых использовались подходы, подобные конвейеру) *
  • ref-данные загружаются несколько раз, по одному на каждый (параллельный) процесс
  • желательно, чтобы я разработал / протестировал его на своем ноутбуке, но у меня недостаточно памяти для загрузки ref-данных. Может быть, если решение будет использовать memory_maps?

Наиболее эффективным решением, по-видимому, является то, что если бы мы смогли загрузить данные ref в память только один раз, сделать их доступными только для чтения для нескольких других процессов, обрабатывающих события

Масштабирование до нескольких компьютеров путем загрузки ref-данных на каждом компьютере. Передайте имена файлов на компьютеры для выполнения.

Есть идеи, как этого добиться?

Большое спасибо за вашу помощь

Ответы [ 2 ]

0 голосов
/ 22 ноября 2018

Я нашел сообщение в блоге о (python) Ray framework.Несмотря на то, что бизнес-цели Рэй сильно различаются, они столкнулись с одинаковыми основными требованиями: кадры данных с общей памятью только для чтения, используемые многими параллельными процессами.Они описывают и объясняют, почему они остановились на Apache Arrow и pyarrow.Звучит интересно, и мы попробуем для нашего варианта использования.

0 голосов
/ 18 ноября 2018

Некоторые вещи, о которых вы можете подумать

  • каждый рабочий процесс dask может иметь любое количество потоков. Совместное использование данных между потоками не требует копирования, но совместное использование между процессами; поэтому вы должны поэкспериментировать со смесью процесс / нить, чтобы найти оптимальный для вас

  • обычно лучше загружать данные в рабочих, чем передавать их от клиента, даже если репликация между процессами довольно эффективна. Если у вас есть память для сохранения ref-данных для каждого работника, это, очевидно, лучше, хотя Dask старается изо всех сил учитывать общие промежуточные зависимости для задач.

  • каждая задача вносит некоторые накладные расходы и может привести к перемещению промежуточных продуктов с одного компьютера на другой. Хотя некоторые линейные цепочки процессов могут быть объединены во время оптимизации, вам, вероятно, лучше написать функцию, которая последовательно вызывает ваши этапы из функции, и вызывать эту функцию как одну задачу, один раз для каждой части ваших данных.

Пример

f = client.submit(read_function, ref_filename)
out = client.map(process_function, list_of_inputs, ref=f)

где process_function в этом примере принимает один вход (который может быть кортежем) и ref= необязательный вход, который является загруженными ссылочными данными. Dask будет тиражировать справочные данные для работников по мере необходимости.

...