Dask Map Tensorflow через разделы - PullRequest
       8

Dask Map Tensorflow через разделы

0 голосов
/ 12 февраля 2020

У меня есть модель Tensorflow, которую я хочу запускать (не тренировать) на моем Dask Dataframe. Я использую map_partitions. Однако, когда я смотрю на панель мониторинга, чтобы проверить прогресс, она выполняет только 1 задачу для всей работы. Я ожидал, что он будет обрабатывать разделы одновременно. Что я делаю не так?

Запустите мой локальный кластер:

cluster = LocalCluster(ip="0.0.0.0")
client=Client(cluster)

ddf = dd.read_csv("data/docs", names=["docs"])

Датафреймы ddf представляют собой набор предложений (строк) и имеют 9 разделов.

Вот модель TF:

def encode_factory(sess):

    output_tensor_names_sorted = ["input_layer/concat:0"]

    loader.load(sess, 'serve', export_path)

    def encode(sentence):
        #encodes string as `Example` protobuff
        serialized_examples = make_examples(sentence, "word")

        inputs_feed_dict = {"input_example_tensor:0": serialized_examples}

        outputs = sess.run(output_tensor_names_sorted,
                       feed_dict=inputs_feed_dict)
        return outputs[0][0]

    return encode

Функция encode_factory принимает объект Tensorflow Session и загружает модель TF с export_path (диск). Функция возвращает замыкание, которое принимает предложение (текстовую строку) в качестве входных данных и возвращает кодировку предложения (массив встраивания / плавающей запятой).

Я регистрирую его как будущее:

future_fn = client.scatter(encode_factory, broadcast=True) 

Затем я определяю свою функцию отображения:

def map_fn(pdf, encoder):
    #create instance of TF model encoder
    encode = encoder(tf.Session())

    embedded_docs = []

    #iterate through items in Pandas Dataframe
    for doc in pdf.docs:
        doc_embedding = encoder(doc) #pass sentence to TF model
        embedded_docs.append(str(doc_embedding))

    pdf["encoding"] = embedded_docs
    return pdf

и применяю карту к разделам:

ddf.map_partitions(map_fn, future_fn, meta={'docs': str, 'encoding': str}).head()

Как мне достичь некоторого параллелизма, работает только 1 работник!

Dask Dashboard

1 Ответ

0 голосов
/ 13 февраля 2020

Ответ, как я обнаружил, довольно прост. Поскольку я звоню head, а не compute Я полагаю, что в Dask используется только 1 рабочий ... Если один использует compute, задачи распределяются по рабочим.

...