Цель: Непрерывная подача сетевых пакетов в Kafka Producer, подключая их к Spark Streaming, чтобы иметь возможность обрабатывать данные пакетов, после чего, используя предварительно обработанные данные в Tensorflow или Keras.
Я обрабатываю непрерывные данные в Spark Streaming (PySpark), который исходит от Kafka, и теперь я хочу отправить обработанные данные в Tensorflow.Как я могу использовать эти Трансформированные DStreams в Tensorflow с Python?Спасибо.
В настоящее время обработка в Spark Streaming не выполняется, но будет добавлена позже.Вот код py:
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.conf import SparkConf
from datetime import datetime
if __name__ == '__main__':
sc = SparkContext(appName='Kafkas')
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic],
{'metadata.broker.list': brokers})
lines = kvs.map(lambda x: x[1])
lines.pprint()
ssc.start()
ssc.awaitTermination()
Также я использую это для запуска потокового воспроизведения:
spark-submit --packages org.apache.spark:spark-streaming-kafka-0–8_2.11:2.0.0
spark-kafka.py localhost:9092 topic