Я обучил нейронную сеть, используя Keras, и я сохранил ее, используя save_model. Я хочу использовать модель для прогнозирования в реальном времени при потоковой передаче с использованием Python. Я использую load_model в драйвере, я передаю модель исполнителям и затем вызываю предикат для каждого пакета для каждого сообщения.
embeddings_index = load_word_embeddings()
model = load_model("/path/to/model")
conf = SparkConf().setMaster("local[*]").setAppName("SparkStreamingFromKafka")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
spark = SparkSession(sc)
# Creating a streaming context with batch interval of 10 sec
ssc = StreamingContext(sc, 10)
ssc.checkpoint("checkpoint")
modelBroadcast = sc.broadcast(model)
embeddingsBroadcast = sc.broadcast(embeddings_index)
kafkaStream = KafkaUtils.createDirectStream(ssc, topics = ['first_topic'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
dataStream = kafkaStream.map(lambda x: json.loads(x[1]))
dataStreamMessage = dataStream.map(lambda x: x["textContent"])
dataStreamMessageData = dataStreamMessage.map(lambda message: [message, modelBroadcast.value.predict(...)]).
Предыдущий код приводит к «Сбой запуска Blas GEMM», что вызвано несколькими экземплярами исполняемого файла python, запущенными одновременно на графическом процессоре. Почему при использовании функции предиката внутри функции карты создаются 2 идентичных процесса? Кроме того, подсказка «using tenorflow backend» также отображается дважды.