эксперт по всем большим данным,
Я столкнулся с проблемой при отправке агрегированных результатов в Kafka Topi c. Он отлично работает с преобразованием без агрегации. Может ли кто-нибудь помочь мне решить эту проблему? Агрегированный результат важен для запуска последующих событий и различных логик c. Вот смоделированная проблема. Весь приведенный ниже код протестирован и работает.
Spark версия 2.4.4
Kafka Plugin org. apache .spark: spark- sql -kafka-0-10_2.11 : 2.4.4
Источник данных
#dummy publisher
CLUSTER_NAME=$(/usr/share/google/get_metadata_value attributes/dataproc-cluster-name)
for i in {0..10000}; do echo "{\"name\":\"${i}\", \"dt\":$(date +%s)}"; sleep 1; done | /usr/lib/kafka/bin/kafka-console-producer.sh --broker-list ${CLUSTER_NAME}-w-1:9092 --topic test_input
> {"name":"3433", "dt":1580282788}
> {"name":"3434", "dt":1580282789}
> {"name":"3435", "dt":1580282790}
> {"name":"3436", "dt":1580282791}
Преобразование ( Без группировка по агрегации)
import time
from pyspark.sql.types import *
from pyspark.sql.functions import *
table='test_input'
wallet_txn_log = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092") \
.option("subscribe", table) \
.load() \
.selectExpr("CAST(value AS STRING) as string").select( from_json("string", schema= StructType([StructField("dt",LongType(),True),StructField("name",StringType(),True)]) ).alias("x")).select('x.*')\
.select(['name',col('dt').cast(TimestampType()).alias("txn_datetime")]) \
.select([to_json(struct('name','txn_datetime')).alias("value")]) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092") \
.option("topic", "test_output_non_aggregate") \
.option("checkpointLocation", "gs://gcp-datawarehouse/streaming/checkpoints/streaming_test1-{}".format(table)).start()
Выход работает как положено
/usr/lib/kafka/bin/kafka-console-consumer.sh --bootstrap-server ${CLUSTER_NAME}-w-1:9092 --topic test_output_non_aggregate
{"name":"2844","txn_datetime":"2020-01-29T15:16:36.000+08:00"}
{"name":"2845","txn_datetime":"2020-01-29T15:16:37.000+08:00"}
Группировка по агрегации
Я пробовал водяной знак и без водяных знаков, оба не работают
table='test_input'
wallet_txn_log = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092") \
.option("subscribe", table) \
.load() \
.selectExpr("CAST(value AS STRING) as string").select( from_json("string", schema= StructType([StructField("dt",LongType(),True),StructField("name",StringType(),True)]) ).alias("x")).select('x.*')\
.select(['name',col('dt').cast(TimestampType()).alias("txn_datetime")]) \
.withWatermark("txn_datetime", "5 seconds") \
.groupBy('name','txn_datetime').agg(
count("name").alias("is_txn_count")) \
.select([to_json(struct('name','is_txn_count')).alias("value")]) \
.writeStream \
.format("kafka") \
.outputMode("update") \
.option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092") \
.option("topic", "test_aggregated_output") \
.option("checkpointLocation", "gs://gcp-datawarehouse/streaming/checkpoints/streaming_test1-aggregated_{}".format(table)).start()
Ошибка
[Stage 1:> (0 + 3) / 200]20/01/29 16:20:57 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, cep-m.asia-southeast1-c.c.tngd-poc.internal, executor 1): org.apache.spark.util.TaskCompletionListenerException: null
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
журнал пряжи -applicationId xxx
Ссылка на журнал пряжи
Проверка запроса
группировка По агрегации запрос правильный. Работает в консоли и в раковине памяти. Однако в Kafka Sink он продолжает выдавать ошибку.
wallet_txn_log = spark \
... .readStream \
... .format("kafka") \
... .option("kafka.bootstrap.servers", "10.148.15.235:9092,10.148.15.236:9092,10.148.15.233:9092") \
... .option("subscribe", table) \
... .load() \
... .selectExpr("CAST(value AS STRING) as string").select( from_json("string", schema= StructType([StructField("dt",LongType(),True),StructField("name",StringType(),True)]) ).alias("x")).select('x.*')\
... .select(['name',col('dt').cast(TimestampType()).alias("txn_datetime")]) \
... .withWatermark("txn_datetime", "5 seconds") \
... .groupBy('name','txn_datetime').agg(
... count("name").alias("is_txn_count")) \
... .select([to_json(struct('name','is_txn_count')).alias("value")])
>>>
>>> df=wallet_txn_log.writeStream \
... .outputMode("update") \
... .option("truncate", False) \
... .format("console") \
... .start()
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------------------+
|value |
+--------------------------------+
|{"name":"4296","is_txn_count":1}|
|{"name":"4300","is_txn_count":1}|
|{"name":"4297","is_txn_count":1}|
|{"name":"4303","is_txn_count":1}|
|{"name":"4299","is_txn_count":1}|
|{"name":"4305","is_txn_count":1}|
|{"name":"4298","is_txn_count":1}|
|{"name":"4304","is_txn_count":1}|
|{"name":"4307","is_txn_count":1}|
|{"name":"4302","is_txn_count":1}|
|{"name":"4301","is_txn_count":1}|
|{"name":"4306","is_txn_count":1}|
|{"name":"4310","is_txn_count":1}|
|{"name":"4309","is_txn_count":1}|
|{"name":"4308","is_txn_count":1}|
+--------------------------------+