У меня есть некоторый код, применяющий функцию карты на сумке для сумок. Мне нужен поисковый словарь, чтобы применить эту функцию, и он не работает с client.scatter.
Я не знаю, правильно ли я поступаю, потому что рабочие начинают, но они ничего не делают. Я пробовал разные конфигурации, глядя на разные примеры, но я не могу заставить его работать. Любая поддержка будет оценена.
Я знаю из Spark, что вы определяете широковещательную переменную и получаете доступ к содержимому с помощью variable.value внутри функции, которую вы хотите применить. Я не вижу то же самое с DASK.
# Function to map
def transform_contacts_add_to_historic_sin(data,historic_dict):
raw_buffer = ''
line = json.loads(data)
if line['timestamp] > historic_dict['timestamp]:
raw_buffer = raw_buffer + line['vid']
return raw_buffer
# main program
# historic_dict is a dictionary previously filled, which is the lookup variable for map function
# file_records will be a list of json.dump getting from a S3 file
from distributed import Client
client = Client()
historic_dict_scattered = client.scatter(historic_dict, broadcast=True)
file_records = []
raw_data = s3_procedure.read_raw_file(... S3 file.......)
data = TextIOWrapper(raw_data)
for line in data:
file_records.append(line)
bag_chunk = db.from_sequence(file_records, npartitions=16)
bag_transform = bag_chunk.map(lambda x: transform_contacts_add_to_historic(x), args=[historic_dict_scattered])
bag_transform.compute()