KafkaConsumer не является безопасным для многопоточного доступа Pyspark - PullRequest
0 голосов
/ 25 апреля 2018

Я использую структурированную потоковую передачу с использованием Kafka, однако при попытке записать поток на консоль я получаю сообщение об ошибке:

Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

Вот мой код:

def group_obs(obs_df):
        obs = obs_df.select(f.col("obs.payload.after").alias("obs"))

        filtered_obs_with_value = obs \
            .union(obs.filter("obs.value_datetime is not null")
                   .withColumn("value", f.col("obs.value_datetime"))
                   .withColumn("value_type", f.lit("datetime")))


        grouped_by_obsgroup = filtered_obs_with_value\
                             .groupBy("obs.obs_group_id", "obs.encounter_id")
                             .agg(f.struct(f.col("obs.obs_group_id"),f.collect_list("tempObs").alias("obs")).alias("obs"))

        query = grouped_by_obsgroup \
                .writeStream \
                .outputMode("update") \
                .format("console") \
                .start()

        query.awaitTermination()

raw_obs = kafka_stream.select(from_json(col("value").cast("string"),mySchema)
transformed_obs = group_obs(raw_obs)

1 Ответ

0 голосов
/ 28 апреля 2018

В вашем коде нет ничего плохого.

Это известная ошибка, отслеживаемая SPARK-23636 . Существует также аналогичная проблема с прямой DStream, отслеживаемой SPARK-19185 .

Согласно билету JIRA:

Единственный обходной путь - запускать наши приложения с executor-cores = 1, с включенным динамическим распределением ресурсов.

, что может или не может быть приемлемо в вашем случае.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...