Spark Structured Streaming Ошибка при отправке агрегированного результата в тему Kafka - PullRequest
2 голосов
/ 29 января 2020

эксперт по всем большим данным,

Я столкнулся с проблемой при отправке агрегированных результатов в Kafka Topi c. Он отлично работает с преобразованием без агрегации. Может ли кто-нибудь помочь мне решить эту проблему? Агрегированный результат важен для запуска последующих событий и различных логик c. Вот смоделированная проблема. Весь приведенный ниже код протестирован и работает.

Spark версия 2.4.4

Kafka Plugin org. apache .spark: spark- sql -kafka-0-10_2.11 : 2.4.4

Источник данных

#dummy publisher
CLUSTER_NAME=$(/usr/share/google/get_metadata_value attributes/dataproc-cluster-name)  

for i in {0..10000}; do echo "{\"name\":\"${i}\", \"dt\":$(date +%s)}";  sleep 1; done | /usr/lib/kafka/bin/kafka-console-producer.sh     --broker-list ${CLUSTER_NAME}-w-1:9092  --topic test_input

> {"name":"3433", "dt":1580282788} 
> {"name":"3434", "dt":1580282789}
> {"name":"3435", "dt":1580282790} 
> {"name":"3436", "dt":1580282791}

Преобразование ( Без группировка по агрегации)

import time
from pyspark.sql.types import *
from pyspark.sql.functions import *

table='test_input'
wallet_txn_log = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092") \
    .option("subscribe", table) \
    .load() \
    .selectExpr("CAST(value AS STRING) as string").select( from_json("string", schema=   StructType([StructField("dt",LongType(),True),StructField("name",StringType(),True)])    ).alias("x")).select('x.*')\
    .select(['name',col('dt').cast(TimestampType()).alias("txn_datetime")]) \
    .select([to_json(struct('name','txn_datetime')).alias("value")]) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092") \
    .option("topic", "test_output_non_aggregate") \
    .option("checkpointLocation", "gs://gcp-datawarehouse/streaming/checkpoints/streaming_test1-{}".format(table)).start()

Выход работает как положено

/usr/lib/kafka/bin/kafka-console-consumer.sh     --bootstrap-server ${CLUSTER_NAME}-w-1:9092 --topic test_output_non_aggregate

{"name":"2844","txn_datetime":"2020-01-29T15:16:36.000+08:00"}
{"name":"2845","txn_datetime":"2020-01-29T15:16:37.000+08:00"}

Группировка по агрегации

Я пробовал водяной знак и без водяных знаков, оба не работают

table='test_input'
wallet_txn_log = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092") \
    .option("subscribe", table) \
    .load() \
    .selectExpr("CAST(value AS STRING) as string").select( from_json("string", schema= StructType([StructField("dt",LongType(),True),StructField("name",StringType(),True)])  ).alias("x")).select('x.*')\
    .select(['name',col('dt').cast(TimestampType()).alias("txn_datetime")]) \
    .withWatermark("txn_datetime", "5 seconds") \
    .groupBy('name','txn_datetime').agg( 
     count("name").alias("is_txn_count")) \
    .select([to_json(struct('name','is_txn_count')).alias("value")]) \
    .writeStream \
    .format("kafka") \
    .outputMode("update") \
    .option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092") \
    .option("topic", "test_aggregated_output") \
    .option("checkpointLocation", "gs://gcp-datawarehouse/streaming/checkpoints/streaming_test1-aggregated_{}".format(table)).start()

Ошибка

[Stage 1:>                                                        (0 + 3) / 200]20/01/29 16:20:57 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, cep-m.asia-southeast1-c.c.tngd-poc.internal, executor 1): org.apache.spark.util.TaskCompletionListenerException: null
        at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

журнал пряжи -applicationId xxx

Ссылка на журнал пряжи

Проверка запроса

группировка По агрегации запрос правильный. Работает в консоли и в раковине памяти. Однако в Kafka Sink он продолжает выдавать ошибку.

wallet_txn_log = spark \
...     .readStream \
...     .format("kafka") \
...     .option("kafka.bootstrap.servers", "10.148.15.235:9092,10.148.15.236:9092,10.148.15.233:9092") \
...     .option("subscribe", table) \
...     .load() \
...     .selectExpr("CAST(value AS STRING) as string").select( from_json("string", schema= StructType([StructField("dt",LongType(),True),StructField("name",StringType(),True)])  ).alias("x")).select('x.*')\
...     .select(['name',col('dt').cast(TimestampType()).alias("txn_datetime")]) \
...     .withWatermark("txn_datetime", "5 seconds") \
...     .groupBy('name','txn_datetime').agg( 
...      count("name").alias("is_txn_count")) \
...     .select([to_json(struct('name','is_txn_count')).alias("value")]) 
>>> 
>>> df=wallet_txn_log.writeStream \
...     .outputMode("update") \
...     .option("truncate", False) \
...     .format("console") \
...     .start()
-------------------------------------------                                     
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
+-----+

-------------------------------------------                                     
Batch: 1
-------------------------------------------
+--------------------------------+
|value                           |
+--------------------------------+
|{"name":"4296","is_txn_count":1}|
|{"name":"4300","is_txn_count":1}|
|{"name":"4297","is_txn_count":1}|
|{"name":"4303","is_txn_count":1}|
|{"name":"4299","is_txn_count":1}|
|{"name":"4305","is_txn_count":1}|
|{"name":"4298","is_txn_count":1}|
|{"name":"4304","is_txn_count":1}|
|{"name":"4307","is_txn_count":1}|
|{"name":"4302","is_txn_count":1}|
|{"name":"4301","is_txn_count":1}|
|{"name":"4306","is_txn_count":1}|
|{"name":"4310","is_txn_count":1}|
|{"name":"4309","is_txn_count":1}|
|{"name":"4308","is_txn_count":1}|
+--------------------------------+

1 Ответ

0 голосов
/ 04 марта 2020

Код группирования по агрегации правильный, так же, как в плагине Kafka для spark 2.4.4 в этом случае есть небольшая ошибка. После понижения искры с 2.4.4 до 2.4.3. Ошибка выше исчезла.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...