Как переменные вещания используются в распараллеливании dask - PullRequest
1 голос
/ 21 мая 2019

У меня есть некоторый код, применяющий функцию карты на сумке для сумок. Мне нужен поисковый словарь, чтобы применить эту функцию, и он не работает с client.scatter.

Я не знаю, правильно ли я поступаю, потому что рабочие начинают, но они ничего не делают. Я пробовал разные конфигурации, глядя на разные примеры, но я не могу заставить его работать. Любая поддержка будет оценена.

Я знаю из Spark, что вы определяете широковещательную переменную и получаете доступ к содержимому с помощью variable.value внутри функции, которую вы хотите применить. Я не вижу то же самое с DASK.

# Function to map
def transform_contacts_add_to_historic_sin(data,historic_dict):
    raw_buffer = ''
    line = json.loads(data)

    if line['timestamp] > historic_dict['timestamp]:
        raw_buffer = raw_buffer + line['vid']

    return raw_buffer

# main program
# historic_dict is a dictionary previously filled, which is the lookup variable for map function
# file_records will be a list of json.dump getting from a S3 file

from distributed import Client
client = Client()
historic_dict_scattered = client.scatter(historic_dict, broadcast=True)

file_records = []
raw_data = s3_procedure.read_raw_file(... S3 file.......)
data = TextIOWrapper(raw_data)
for line in data:
   file_records.append(line)

bag_chunk = db.from_sequence(file_records, npartitions=16)
bag_transform = bag_chunk.map(lambda x: transform_contacts_add_to_historic(x), args=[historic_dict_scattered])
bag_transform.compute()

1 Ответ

0 голосов
/ 23 мая 2019

Если ваш словарь маленький, вы можете просто включить его напрямую

def func(partition, d):
    return ...

my_dict = {...}

b = b.map(func, d=my_dict)

Если он большой, то, возможно, вы захотите обернуть его в Dask с задержкой сначала

my_dict = dask.delayed(my_dict)

b = b.map(func, d=my_dict)

Если он оченьбольшой, тогда да, вы можете сначала разбросать его (хотя я бы избежал этого, если бы все получилось с любым из указанных выше подходов).

[my_dict] = client.scatter([my_dict])

b = b.map(func, d=my_dict)
...