Как я могу поддерживать временный словарь в приложении pyspark? - PullRequest
0 голосов
/ 28 января 2019

Я хочу использовать предварительно встроенную модель встраивания (fasttext) в приложении pyspark.

Поэтому, если я передаю файл (.bin), выдается следующее исключение: Traceback (последний вызов был последним):

cPickle.PicklingError: Could not serialize broadcast: OverflowError: cannot serialize a string larger than 2 GiB

Вместо этого я попытался использовать sc.addFile(modelpath) где modelpath=path/to/model.bin следующим образом:

я создаю файл с именем fasttextSpark.py

import gensim
from gensim.models.fasttext import FastText as FT_gensim
# Load model (loads when this library is being imported)
model = FT_gensim.load_fasttext_format("/project/6008168/bib/wiki.en.bin")

# This is the function we use in UDF to predict the language of a given msg
def get_vector(msg):
    pred = model[msg]
    return pred

и testSubmit.sh:

#!/bin/bash
#SBATCH -N 2
#SBATCH -t 00:10:00
#SBATCH --mem 20000
#SBATCH --ntasks-per-node 1
#SBATCH --cpus-per-task 32
module load python/2.7.14
source "/project/6008168/bib/ENV2.7.14/bin/activate"
module load spark/2.3.0
spark-submit /project/6008168/bib/test.py

и test.py:

from __future__ import print_function
import sys
import time
import math
import csv
import datetime
import StringIO
import pyspark
import gensim
from operator import add
from pyspark.sql import *
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from gensim.models.fasttext import FastText as FT_gensim
appName = "bib"
modelpath = "/project/6008168/bib/wiki.en.bin"
conf = (SparkConf()
         .setAppName(appName)
         .set("spark.executor.memory", "12G")
         .set("spark.network.timeout", "800s")
         .set("spark.executor.heartbeatInterval", "20s")
         .set("spark.driver.maxResultSize", "12g")
         .set("spark.executor.instances", 2)
         .set("spark.executor.cores", 30)
         )
sc = SparkContext(conf = conf)
#model = FT_gensim.load_fasttext_format(modelpath)
sc.addFile(modelpath)
sc.addPyFile("/project/6008168/bib/fasttextSpark.py")

# Import our custom fastText language classifier lib
import fasttextSpark
print ("nights = ", fasttextSpark.get_vector("nights"))
print ("done")

Теперь у каждого узла будет копия предварительно обученного набора данных.У некоторых слов нет словарного запаса, поэтому каждый раз, когда я сталкиваюсь с такими словами, я хочу создать для него случайный, но фиксированный вектор и добавить слово и его вектор в словарь.

Итак, как мне поддерживать такой словарь в каждом узле?

Действительно, предположим, что мой rdd следующий: my_rdd = (id, предложение), и я хочу найти вектор вложенияпредложение, суммируя векторы его слов.Сколько раз будет загружена модель внедрения.Например:

предположим, rdd=("id1", "motorcycle parts"), загружает ли моя реализация модель два раза: один для мотоцикла и один для деталей?если да, мой подход неэффективен?В этом случае, каковы должны быть лучшие подходы, которые будут применяться?

1 Ответ

0 голосов
/ 01 февраля 2019

Переменные модуля в Python оцениваются один раз, когда модуль загружается.Таким образом, переменная будет загружаться один раз для каждого интерпретатора и поддерживаться в активном состоянии до тех пор, пока поддерживается интерпретатор.

Однако рабочие процессы Spark не разделяют память, поэтому для каждого рабочего процесса будет одна копия словаря.То же самое было бы верно, если бы у вас была широковещательная переменная.

Таким образом, ваше текущее решение максимально приближено к тому, что вы хотите, без использования низкоуровневых примитивов (таких как отображение памяти) или внешнего хранилища.

...