Я хочу использовать предварительно встроенную модель встраивания (fasttext) в приложении pyspark.
Поэтому, если я передаю файл (.bin), выдается следующее исключение: Traceback (последний вызов был последним):
cPickle.PicklingError: Could not serialize broadcast: OverflowError: cannot serialize a string larger than 2 GiB
Вместо этого я попытался использовать sc.addFile(modelpath)
где modelpath=path/to/model.bin
следующим образом:
я создаю файл с именем fasttextSpark.py
import gensim
from gensim.models.fasttext import FastText as FT_gensim
# Load model (loads when this library is being imported)
model = FT_gensim.load_fasttext_format("/project/6008168/bib/wiki.en.bin")
# This is the function we use in UDF to predict the language of a given msg
def get_vector(msg):
pred = model[msg]
return pred
и testSubmit.sh:
#!/bin/bash
#SBATCH -N 2
#SBATCH -t 00:10:00
#SBATCH --mem 20000
#SBATCH --ntasks-per-node 1
#SBATCH --cpus-per-task 32
module load python/2.7.14
source "/project/6008168/bib/ENV2.7.14/bin/activate"
module load spark/2.3.0
spark-submit /project/6008168/bib/test.py
и test.py:
from __future__ import print_function
import sys
import time
import math
import csv
import datetime
import StringIO
import pyspark
import gensim
from operator import add
from pyspark.sql import *
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from gensim.models.fasttext import FastText as FT_gensim
appName = "bib"
modelpath = "/project/6008168/bib/wiki.en.bin"
conf = (SparkConf()
.setAppName(appName)
.set("spark.executor.memory", "12G")
.set("spark.network.timeout", "800s")
.set("spark.executor.heartbeatInterval", "20s")
.set("spark.driver.maxResultSize", "12g")
.set("spark.executor.instances", 2)
.set("spark.executor.cores", 30)
)
sc = SparkContext(conf = conf)
#model = FT_gensim.load_fasttext_format(modelpath)
sc.addFile(modelpath)
sc.addPyFile("/project/6008168/bib/fasttextSpark.py")
# Import our custom fastText language classifier lib
import fasttextSpark
print ("nights = ", fasttextSpark.get_vector("nights"))
print ("done")
Теперь у каждого узла будет копия предварительно обученного набора данных.У некоторых слов нет словарного запаса, поэтому каждый раз, когда я сталкиваюсь с такими словами, я хочу создать для него случайный, но фиксированный вектор и добавить слово и его вектор в словарь.
Итак, как мне поддерживать такой словарь в каждом узле?
Действительно, предположим, что мой rdd следующий: my_rdd = (id, предложение), и я хочу найти вектор вложенияпредложение, суммируя векторы его слов.Сколько раз будет загружена модель внедрения.Например:
предположим, rdd=("id1", "motorcycle parts")
, загружает ли моя реализация модель два раза: один для мотоцикла и один для деталей?если да, мой подход неэффективен?В этом случае, каковы должны быть лучшие подходы, которые будут применяться?