Я пытаюсь интегрировать Kafka с Spark-Structured-Streaming в PySpark в MongoDB Sink. Мне нужна помощь по исправлению моего кода, если я ошибаюсь
Получил интегрированные Kafka-PySpark и PySpark-Mon go. Сейчас пытаюсь интегрировать конвейер от Kafka-PySpark-Mon go
Я использую pyspark 2.4.5.
Это мой код:
spark = SparkSession.builder \
.appName("Spark Structured Streaming from Kafka") \
.getOrCreate()
topic_name = "Be_"
kafka_broker = "localhost:9092"
producer = KafkaProducer(bootstrap_servers = kafka_broker)
jsonschema = StructType([ \
StructField("id", StringType()), StructField("Date", StringType()), \
StructField("Name", StringType()), StructField("Hour", StringType()), \
StructField("Last_Price", FloatType()), StructField("Var%", FloatType()), \
StructField("Last_Value", FloatType()), StructField("TYpe", StringType())])
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", kafka_broker) \
.option("startingOffsets", "latest") \
.option("subscribe", topic_name) \
.load() \
.selectExpr("CAST(value AS STRING)")
def parse_data_from_kafka_message(sdf, schema):
from pyspark.sql.functions import split
assert sdf.isStreaming == True, "DataFrame doesn't receive streaming data"
col = split(sdf['value'], ',') #split attributes to nested array in one Column
#now expand col to multiple top-level columns
for idx, field in enumerate(schema):
sdf = sdf.withColumn(field.name, col.getItem(idx).cast(field.dataType))
return sdf.select([field.name for field in schema])
df= parse_data_from_kafka_message(df, jsonschema)
df \
.writeStream \
.format("mongo") \
.option("com.mongodb.spark.sql.DefaultSource","mongodb://localhost:27017/DataManagement.Data") \
.outputMode("append") \
.start() \
.awaitTermination()
Это ошибка, которая появляется в консоли:
I get this error from the console:
Py4JJavaError: An error occurred while calling o263.start.
: java.lang.UnsupportedOperationException: Data source mongo does not support streamed writing
Я также пытался использовать ForeachWriter:
class ForeachWriter:
def open (self, partition_id, epoch_id):
# Open connection. This method is optional in Python.
self.connection = MongoClient ('mongodb: // localhost: 27017')
self.db = self.connection ['DataManagement']
self.coll = self.db ['Data']
pass
def process (self, row):
# Write row to connection. This method is NOT optional in Python.
# Self.coll = None
self.coll.insert_one (row.asDict ())
pass
def close (self, error):
# Close the connection. This method in optional in Python.
pass
df \
.writeStream \
.foreach (ForeachWriter ()) \
.trigger (processingTime = '3 seconds') \
.outputMode ("Append") \
.option ("truncate", "false") \
.start ()
К сожалению, приемник mongodb не работает в любом случае, и я хотел бы чтобы узнать, есть ли другой способ отправить данные в MongoDB с помощью PySpark или я делаю что-то не так в коде. Большое спасибо