Изменение конфигурации во время выполнения для PySpark - PullRequest
0 голосов
/ 26 апреля 2020

Я пытался развернуть обученный индекс Фейсса в PySpark и выполнить распределенный поиск. Таким образом, весь процесс включает в себя:

  1. Предварительная обработка
  2. Загрузка индекса Faiss (~ 15G) и выполнение поиска Faiss
  3. Постобработка и запись в HDFS

Я установил ЦП для каждой задачи равным 10 (spark.task.cpus=10) для выполнения многопоточного поиска. Но шаг 1 и шаг 3 могут использовать только 1 процессор на задачу. Чтобы использовать все процессоры, я хочу установить spark.task.cpus=1 до шага 1 и 3. Я попытался установить метод RuntimeConfig, но, похоже, моя программа застряла. Любой совет, как изменить конфигурацию во время выполнения или как оптимизировать эту проблему?

Пример кода:

def load_and_search(x, model_path):
    faiss_idx = faiss.read_index(model_path)
    q_vec = np.concatenate(x)
    _, idx_array = faiss_idx.search(q_vec, k=10)
    return idx_array


data = sc.textFile(input_path)

# preprocess, only used one cpu per task
data = data.map(lambda x: x)

# load faiss index and search, used multiple cpus per task
data = data.mapPartitioins(lambda x: load_and_search(x, model_path))

# postprocess and write, one cpu per task
data = data.map(lambda x: x).saveAsTextFile(result_path)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...