Наша компания в настоящее время использует prefect.io для обработки данных (ELT, генерация отчетов, ML и др. c). Мы только начали добавлять возможность выполнять параллельное выполнение задач, которое работает на Dask . Наши потоки выполняются с использованием эфемерных AWS Fargate контейнеров, которые будут использовать Dask LocalCluster с определенным количеством рабочих, потоков, процессов, переданных в объект LocalCluster.
Наше путешествие по Dask будет выглядят очень похоже на это:
- Продолжайте использовать LocalCluster для одной машины, пока мы не увеличим максимально допустимое количество процессоров / память
- Когда мы вырастим один контейнер, появятся дополнительные рабочие контейнеры на начальном этапе. контейнер (а-ля dask-kubernetes ) и присоедините их к LocalCluster.
В настоящее время мы начинаем с контейнеров, которые имеют 256 процессоров (.25 vCPU) и 512 памяти и прикрепление LocalCluster к 1 n_workers и 3 threads_per_worker для получения разумного количества параллелизма. Тем не менее, это действительно догадка. 1 n_workers, так как это машина с менее чем 1 vcpu и 3 потоками, потому что это не кажется мне безумным, основываясь на моем предыдущем опыте запуска других приложений на основе python в Fargate. Кажется, это работает нормально в очень простом примере, который просто сопоставляет функцию со списком элементов.
RENEWAL_TABLES = [
'Activity',
'CurrentPolicyTermStaus',
'PolicyRenewalStatus',
'PolicyTerm',
'PolicyTermStatus',
'EndorsementPolicyTerm',
'PolicyLifeState'
]
RENEWAL_TABLES_PAIRS = [
(i, 1433 + idx) for idx, i in enumerate(RENEWAL_TABLES)
]
@task(state_handlers=[HANDLER])
def dummy_step():
LOGGER.info('Dummy Step...')
sleep(15)
@task(state_handlers=[HANDLER])
def test_map(table):
LOGGER.info('table: {}...'.format(table))
sleep(15)
with Flow(Path(__file__).stem, SCHEDULE, state_handlers=[HANDLER]) as flow:
first_step = dummy_step()
test_map.map(RENEWAL_TABLES_PAIRS).set_dependencies(upstream_tasks=[first_step])
Я вижу не более 3 задач, выполняемых одновременно.
Я бы действительно хотелось бы понять, как лучше настроить n_workers (отдельная машина), потоки, процессы, поскольку мы расширяем размер одной машины до добавления удаленных работников. Я знаю, что это зависит от моей рабочей нагрузки, но вы могли видеть комбинацию вещей в одном потоке, когда одна задача выполняет извлечение из базы данных в CSV, а другая задача выполняет вычисление pandas. В Интернете я видел такие вещи, где кажется, что это должны быть потоки = количество процессоров, запрошенных для документации, но кажется, что вы все равно можете достичь параллелизма с менее чем одним процессором в Fargate.
Любая обратная связь приветствуется и может помочь другим, желающим использовать Dask в более эфемерной природе.
Учитывая, что Fargate увеличивает с .25 -> .50 -> 1 -> 2 -> 4 для vCPU, я думаю, что безопасно go с настройкой 1 рабочий на 1 vcpu. Однако было бы полезно понять, как выбрать хороший верхний предел для числа потоков на одного работника, учитывая, как работает распределение Fargate vcpu.