Как использовать foreach или foreachBatch в PySpark для записи в базу данных? - PullRequest
1 голос
/ 08 ноября 2019

Я хочу сделать Spark Structured Streaming (Spark 2.4.x) из источника Kafka в MariaDB с Python (PySpark).

Я хочу использовать потоковый фрейм данных Spark, а не статический или фрейм данных Pandas.

Похоже, что нужно использовать foreach или foreachBatch, поскольку нет возможных приемников базы данных для потоковых данных в соответствии с https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks.

Вот моя попытка:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType
from pyspark.sql import DataFrameWriter
# configuration of target db
db_target_url = "jdbc:mysql://localhost/database"
db_target_properties = {"user":"writer", "password":"1234"}
# schema
schema_simple = StructType([StructField("Signal", StringType()),StructField("Value", DoubleType())])

# create spark session
spark = SparkSession.builder.appName("streamer").getOrCreate()

# create DataFrame representing the stream
df = spark.readStream \
  .format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "mytopic") \
  .load() \
  .selectExpr("Timestamp", "cast (value as string) as json") \
  .select("Timestamp", F.from_json("json", schema_simple).alias('json_wrapper')) \
  .selectExpr("Timestamp", "json_wrapper.Signal", "json_wrapper.Value")
df.printSchema()
# Do some dummy processing
df2 = df.filter("Value < 11111111111")
print("df2: ", df2.isStreaming)

def process_row(row):
    # Process row
    row.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
    pass
query = df2.writeStream.foreach(process_row).start()

Я получаю сообщение об ошибке:

Ошибка атрибута: запись

Почему?

1 Ответ

1 голос
/ 09 ноября 2019

tl; др Заменить foreach на foreachBatch.


Цитирование официальной документации :

Операции foreach и foreachBatch позволяют применять произвольные операции и записывать логику на вывод потокового запроса. Они имеют несколько разные варианты использования - в то время как foreach позволяет настраивать логику записи в каждой строке, foreachBatch допускает произвольные операции и настраиваемую логику на выходе каждой микропакета.

Другими словами, ваш writeStream.foreach(process_row) действует на одну строку (данных), которой нет write.jdbc, и, следовательно, ошибка.

Думайте о строке как очасть данных, которую вы можете сохранить где угодно, используя любой API.

Если вам действительно нужна поддержка от Spark (и вы действительно используете write.jdbc), вам следует использовать foreachBatch.

* 1029. *

, в то время как foreach разрешает настраиваемую логику записи в каждой строке, foreachBatch разрешает произвольные операции и настраиваемую логику на выходе каждой микропакета.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...