У меня есть модель Tensorflow, которую я хочу запускать (не тренировать) на моем Dask Dataframe. Я использую map_partitions
. Однако, когда я смотрю на панель мониторинга, чтобы проверить прогресс, она выполняет только 1 задачу для всей работы. Я ожидал, что он будет обрабатывать разделы одновременно. Что я делаю не так?
Запустите мой локальный кластер:
cluster = LocalCluster(ip="0.0.0.0")
client=Client(cluster)
ddf = dd.read_csv("data/docs", names=["docs"])
Датафреймы ddf
представляют собой набор предложений (строк) и имеют 9 разделов.
Вот модель TF:
def encode_factory(sess):
output_tensor_names_sorted = ["input_layer/concat:0"]
loader.load(sess, 'serve', export_path)
def encode(sentence):
#encodes string as `Example` protobuff
serialized_examples = make_examples(sentence, "word")
inputs_feed_dict = {"input_example_tensor:0": serialized_examples}
outputs = sess.run(output_tensor_names_sorted,
feed_dict=inputs_feed_dict)
return outputs[0][0]
return encode
Функция encode_factory
принимает объект Tensorflow Session
и загружает модель TF с export_path
(диск). Функция возвращает замыкание, которое принимает предложение (текстовую строку) в качестве входных данных и возвращает кодировку предложения (массив встраивания / плавающей запятой).
Я регистрирую его как будущее:
future_fn = client.scatter(encode_factory, broadcast=True)
Затем я определяю свою функцию отображения:
def map_fn(pdf, encoder):
#create instance of TF model encoder
encode = encoder(tf.Session())
embedded_docs = []
#iterate through items in Pandas Dataframe
for doc in pdf.docs:
doc_embedding = encoder(doc) #pass sentence to TF model
embedded_docs.append(str(doc_embedding))
pdf["encoding"] = embedded_docs
return pdf
и применяю карту к разделам:
ddf.map_partitions(map_fn, future_fn, meta={'docs': str, 'encoding': str}).head()
Как мне достичь некоторого параллелизма, работает только 1 работник!