Как загрузить файл в каждого исполнителя один раз? - PullRequest
0 голосов
/ 05 февраля 2019

Я определяю следующий код для загрузки предварительно встроенной модели встраивания:

import gensim

from gensim.models.fasttext import FastText as FT_gensim
import numpy as np

class Loader(object):
    cache = {}
    emb_dic = {}
    count = 0
    def __init__(self, filename):
        print("|-------------------------------------|")
        print ("Welcome to Loader class in python")
        print("|-------------------------------------|")
        self.fn = filename

    @property
    def fasttext(self):
        if Loader.count == 1:
                print("already loaded")
        if self.fn not in Loader.cache:
            Loader.cache[self.fn] =  FT_gensim.load_fasttext_format(self.fn)
            Loader.count = Loader.count + 1
        return Loader.cache[self.fn]


    def map(self, word):
        if word not in self.fasttext:
            Loader.emb_dic[word] = np.random.uniform(low = 0.0, high = 1.0, size = 300)
            return Loader.emb_dic[word]
        return self.fasttext[word]

я называю этот класс следующим образом:

inputRaw = sc.textFile(inputFile, 3).map(lambda line: (line.split("\t")[0], line.split("\t")[1])).map(Loader(modelpath).map)
  1. Я путаюсь Сколько разфайл пути к модели будет загружен?Я хочу, чтобы один раз был загружен на каждого исполнителя и использовался всеми его ядрами.Мой ответ на этот вопрос - путь к модели будет загружен 3 раза (= количество разделов).Если мой ответ правильный, недостаток такого моделирования связан с размером пути к файлу модели.Предположим, этот файл 10 ГБ, и предположим, у меня есть 200 разделов.Таким образом, в этом случае нам потребуется 10 * 200 ГБ = 2000 с огромным (Это решение может работать только с небольшим количеством разделов.)

Предположим, у меня есть rdd =(id, sentence) =[(id1, u'patina californian'), (id2, u'virgil american'), (id3', u'frensh'), (id4, u'american')]

и я хочу суммировать векторы встраиваемых слов для каждого предложения:

def test(document):
    print("document is = {}".format(document))
    documentWords = document.split(" ")
    features = np.zeros(300)
    for word in documentWords:
        features = np.add(features, Loader(modelpath).fasttext[word])
    return features

def calltest(inputRawSource):

    my_rdd = inputRawSource.map(lambda line: (line[0], test(line[1]))).cache()
    return my_rdd

В этом случае сколько раз будет загружен файл modelpath?Обратите внимание, что я установил spark.executor.instances" to 3

1 Ответ

0 голосов
/ 10 февраля 2019

По умолчанию количество разделов установлено равным общему количеству ядер на всех узлах-исполнителях в кластере Spark.Предположим, что вы обрабатываете 10 ГБ в кластере Spark (или суперкомпьютерном исполнителе), который содержит в общей сложности 200 ядер ЦП, что означает, что Spark может использовать 200 разделов по умолчанию для обработки ваших данных.

Кроме того, чтобы сделатьвсе ядра вашего процессора работают на каждого исполнителя, это можно решить в python (используя 100% всех ядер с модулем многопроцессорной обработки).

...