Как правильно выбрать количество потоков, рабочих и процессов для Dask при работе в эфемерной среде как на одной машине, так и на кластере - PullRequest
2 голосов
/ 23 января 2020

Наша компания в настоящее время использует prefect.io для обработки данных (ELT, генерация отчетов, ML и др. c). Мы только начали добавлять возможность выполнять параллельное выполнение задач, которое работает на Dask . Наши потоки выполняются с использованием эфемерных AWS Fargate контейнеров, которые будут использовать Dask LocalCluster с определенным количеством рабочих, потоков, процессов, переданных в объект LocalCluster.

Наше путешествие по Dask будет выглядят очень похоже на это:

  1. Продолжайте использовать LocalCluster для одной машины, пока мы не увеличим максимально допустимое количество процессоров / память
  2. Когда мы вырастим один контейнер, появятся дополнительные рабочие контейнеры на начальном этапе. контейнер (а-ля dask-kubernetes ) и присоедините их к LocalCluster.

В настоящее время мы начинаем с контейнеров, которые имеют 256 процессоров (.25 vCPU) и 512 памяти и прикрепление LocalCluster к 1 n_workers и 3 threads_per_worker для получения разумного количества параллелизма. Тем не менее, это действительно догадка. 1 n_workers, так как это машина с менее чем 1 vcpu и 3 потоками, потому что это не кажется мне безумным, основываясь на моем предыдущем опыте запуска других приложений на основе python в Fargate. Кажется, это работает нормально в очень простом примере, который просто сопоставляет функцию со списком элементов.

RENEWAL_TABLES = [
'Activity',
'CurrentPolicyTermStaus',
'PolicyRenewalStatus',
'PolicyTerm',
'PolicyTermStatus',
'EndorsementPolicyTerm',
'PolicyLifeState'
]

RENEWAL_TABLES_PAIRS = [
    (i, 1433 + idx) for idx, i in enumerate(RENEWAL_TABLES)
]


@task(state_handlers=[HANDLER])
def dummy_step():
    LOGGER.info('Dummy Step...')
    sleep(15)


@task(state_handlers=[HANDLER])
def test_map(table):
    LOGGER.info('table: {}...'.format(table))
    sleep(15)


with Flow(Path(__file__).stem, SCHEDULE, state_handlers=[HANDLER]) as flow:
    first_step = dummy_step()
    test_map.map(RENEWAL_TABLES_PAIRS).set_dependencies(upstream_tasks=[first_step])

Я вижу не более 3 задач, выполняемых одновременно.

Я бы действительно хотелось бы понять, как лучше настроить n_workers (отдельная машина), потоки, процессы, поскольку мы расширяем размер одной машины до добавления удаленных работников. Я знаю, что это зависит от моей рабочей нагрузки, но вы могли видеть комбинацию вещей в одном потоке, когда одна задача выполняет извлечение из базы данных в CSV, а другая задача выполняет вычисление pandas. В Интернете я видел такие вещи, где кажется, что это должны быть потоки = количество процессоров, запрошенных для документации, но кажется, что вы все равно можете достичь параллелизма с менее чем одним процессором в Fargate.

Любая обратная связь приветствуется и может помочь другим, желающим использовать Dask в более эфемерной природе.

Учитывая, что Fargate увеличивает с .25 -> .50 -> 1 -> 2 -> 4 для vCPU, я думаю, что безопасно go с настройкой 1 рабочий на 1 vcpu. Однако было бы полезно понять, как выбрать хороший верхний предел для числа потоков на одного работника, учитывая, как работает распределение Fargate vcpu.

...