Как интегрировать Spark Streaming с Tensorflow? - PullRequest
0 голосов
/ 18 декабря 2018

Цель: Непрерывная подача сетевых пакетов в Kafka Producer, подключая их к Spark Streaming, чтобы иметь возможность обрабатывать данные пакетов, после чего, используя предварительно обработанные данные в Tensorflow или Keras.

Я обрабатываю непрерывные данные в Spark Streaming (PySpark), который исходит от Kafka, и теперь я хочу отправить обработанные данные в Tensorflow.Как я могу использовать эти Трансформированные DStreams в Tensorflow с Python?Спасибо.

В настоящее время обработка в Spark Streaming не выполняется, но будет добавлена ​​позже.Вот код py:

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.conf import SparkConf
from datetime import datetime

if __name__ == '__main__':
    sc = SparkContext(appName='Kafkas')
    ssc = StreamingContext(sc, 2)
    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], 
                                       {'metadata.broker.list': brokers})
    lines = kvs.map(lambda x: x[1])
    lines.pprint()
    ssc.start()
    ssc.awaitTermination()

Также я использую это для запуска потокового воспроизведения:

spark-submit --packages org.apache.spark:spark-streaming-kafka-0–8_2.11:2.0.0 
spark-kafka.py localhost:9092 topic

1 Ответ

0 голосов
/ 19 декабря 2018

У вас есть два способа решения вашей проблемы:

  1. После того как вы обработали свои данные, вы можете сохранить их, а затем независимо запустить свою модель (в Keras?).Просто создайте файл паркета / добавьте его, если он уже существует:

    if os.path.isdir(DATA_TREATED_PATH):
        data.write.mode('append').parquet(DATA_TREATED)
    else:
        data.write.parquet(DATA_TREATED_PATH)
    

И затем вы просто создаете свою модель с помощью keras / tenorflow и запускаете ее, как каждый час, может быть?Или столько раз, сколько вы хотите, чтобы он был обновлен.Так что каждый раз запускается с нуля.

Вы обрабатываете свои данные, сохраняете их, как и раньше, но после этого загружаете свою модель, обучаете новые данные / новую партию и затем сохраняете свою модель.Это называется онлайн-обучением, потому что вы не запускаете свою модель с нуля.
...