Я использую структурированную потоковую передачу с использованием Kafka, однако при попытке записать поток на консоль я получаю сообщение об ошибке:
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
Вот мой код:
def group_obs(obs_df):
obs = obs_df.select(f.col("obs.payload.after").alias("obs"))
filtered_obs_with_value = obs \
.union(obs.filter("obs.value_datetime is not null")
.withColumn("value", f.col("obs.value_datetime"))
.withColumn("value_type", f.lit("datetime")))
grouped_by_obsgroup = filtered_obs_with_value\
.groupBy("obs.obs_group_id", "obs.encounter_id")
.agg(f.struct(f.col("obs.obs_group_id"),f.collect_list("tempObs").alias("obs")).alias("obs"))
query = grouped_by_obsgroup \
.writeStream \
.outputMode("update") \
.format("console") \
.start()
query.awaitTermination()
raw_obs = kafka_stream.select(from_json(col("value").cast("string"),mySchema)
transformed_obs = group_obs(raw_obs)