Как использовать структурированную Spark Streaming в pySpark для вставки строки в Mongodb? - PullRequest
0 голосов
/ 06 апреля 2020

Я пытаюсь интегрировать Kafka с Spark-Structured-Streaming в PySpark в MongoDB Sink. Мне нужна помощь по исправлению моего кода, если я ошибаюсь

Получил интегрированные Kafka-PySpark и PySpark-Mon go. Сейчас пытаюсь интегрировать конвейер от Kafka-PySpark-Mon go

Я использую pyspark 2.4.5.

Это мой код:

spark = SparkSession.builder \
  .appName("Spark Structured Streaming from Kafka") \
  .getOrCreate()

topic_name = "Be_"

kafka_broker = "localhost:9092"

producer = KafkaProducer(bootstrap_servers = kafka_broker)

jsonschema = StructType([ \
  StructField("id", StringType()), StructField("Date", StringType()), \
  StructField("Name", StringType()), StructField("Hour", StringType()), \
  StructField("Last_Price", FloatType()), StructField("Var%", FloatType()), \
  StructField("Last_Value", FloatType()), StructField("TYpe", StringType())])

df = spark.readStream.format("kafka") \
          .option("kafka.bootstrap.servers", kafka_broker) \
          .option("startingOffsets", "latest") \
          .option("subscribe", topic_name) \
          .load() \
          .selectExpr("CAST(value AS STRING)") 

def parse_data_from_kafka_message(sdf, schema):
    from pyspark.sql.functions import split
    assert sdf.isStreaming == True, "DataFrame doesn't receive streaming data"
    col = split(sdf['value'], ',') #split attributes to nested array in one Column
    #now expand col to multiple top-level columns
    for idx, field in enumerate(schema): 
        sdf = sdf.withColumn(field.name, col.getItem(idx).cast(field.dataType))
    return sdf.select([field.name for field in schema])

df= parse_data_from_kafka_message(df, jsonschema)

df \
  .writeStream \
  .format("mongo") \
  .option("com.mongodb.spark.sql.DefaultSource","mongodb://localhost:27017/DataManagement.Data") \
  .outputMode("append") \
  .start() \
  .awaitTermination()

Это ошибка, которая появляется в консоли:

I get this error from the console:
Py4JJavaError: An error occurred while calling o263.start.
: java.lang.UnsupportedOperationException: Data source mongo does not support streamed writing

Я также пытался использовать ForeachWriter:

class ForeachWriter:
     def open (self, partition_id, epoch_id):
         # Open connection. This method is optional in Python.
         self.connection = MongoClient ('mongodb: // localhost: 27017')
         self.db = self.connection ['DataManagement']
         self.coll = self.db ['Data']
         pass

     def process (self, row):
         # Write row to connection. This method is NOT optional in Python.
         # Self.coll = None
         self.coll.insert_one (row.asDict ())
         pass

     def close (self, error):
         # Close the connection. This method in optional in Python.
         pass

     df \
         .writeStream \
         .foreach (ForeachWriter ()) \
         .trigger (processingTime = '3 seconds') \
         .outputMode ("Append") \
         .option ("truncate", "false") \
         .start ()

К сожалению, приемник mongodb не работает в любом случае, и я хотел бы чтобы узнать, есть ли другой способ отправить данные в MongoDB с помощью PySpark или я делаю что-то не так в коде. Большое спасибо

...