Pyspark - печатать сообщения от Kafka - PullRequest
0 голосов
/ 18 ноября 2018

Я установил систему kafka с производителем и потребителем, передавая в виде сообщений строки файла json.

Используя pyspark, мне нужно проанализировать данные для разных потоковых окон.Для этого мне нужно взглянуть на данные, так как они передаются в pyspark ... Как я могу это сделать?

Для запуска кода я использовал Docker Яннаэля.Вот мой код Python:

# Add dependencies and load modules
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 pyspark-shell'

from kafka import KafkaConsumer
from random import randint
from time import sleep

# Load modules and start SparkContext  
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
conf = SparkConf() \
    .setAppName("Streaming test") \
    .setMaster("local[2]") \
    .set("spark.cassandra.connection.host", "127.0.0.1")

try:
    sc.stop()
except:
    pass    

sc = SparkContext(conf=conf) 
sqlContext=SQLContext(sc)
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Create streaming task
ssc = StreamingContext(sc, 0.60)
kafkaStream = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {'test': 1})
ssc.start()

1 Ответ

0 голосов
/ 19 ноября 2018

Вы можете либо позвонить kafkaStream.pprint(), либо узнать больше о структурированной потоковой передаче , и вы можете печатать так:

query = kafkaStream \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

Я вижу, что у вас есть конечные точки, поэтому, предполагая, что вы пишете в Cassandra, вы можете использовать Kafka Connect вместо того, чтобы писать код Spark для этого

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...