Spark Streaming Ведение потоковых данных между пакетами. - PullRequest
0 голосов
/ 24 апреля 2018

Я пытаюсь построить конвейер etl, используя pyspark и kafka.Мне нужно сохранить потоки для будущих операций над ними.Я попытался выполнить потоковую передачу с отслеживанием состояния с помощью updateStateByKey, и она работает в течение короткого времени, прежде чем Spark отключает защиту от rdds (из контрольной точки) и пытается снова получить к ним доступ, и приложение вылетает с исключением FileNotFound.Ниже приведен мой код:

        sc = SparkContext(conf=spark_conf)
        sc.setLogLevel("INFO")
        spark = SparkSession(sparkContext=sc)

        ssc = StreamingContext(sc, ssc_config['batchDuration'])

        config = getConfig()['kafka']
        kafka_stream = KafkaStream.create_source(ssc, config, "mytopic")
        new_msg = get_new_msg_stream(kafka_stream)
        transformed_msg = transform_msg(new_msg).updateStateByKey(updateState)

        ssc.checkpoint("./")
        ssc.start()
        ssc.awaitTermination()

        def updateState(new_state, old_state):
            if len(new_state) > 0:
               return new_state
            return old_state

Я не понимаю, почему это происходит или если потоковая передача с отслеживанием состояния была хорошей идеей для моего варианта использования, потому что мне нужно поддерживать состояние навсегда.Вот трассировка стека драйверов:

JobScheduler:54 - Finished job streaming job 1524592150000 ms.0 from job set of time 1524592150000 ms
PythonRDD:54 - Removing RDD 60 from persistence list
BlockManager:54 - Removing RDD 60
CheckpointWriter:54 - Submitted checkpoint of time 1524592150000 ms to writer queue
CheckpointWriter:54 - Saving checkpoint for time 1524592150000 ms to file 'file:/home/jovyan/ampath/checkpoint-1524592195000'
SparkContext:54 - Starting job: runJob at PythonRDD.scala:141
DAGScheduler:54 - Got job 38 (runJob at PythonRDD.scala:141) with 1 output partitions
DAGScheduler:54 - Submitting ResultStage 46 (PythonRDD[138] at RDD at PythonRDD.scala:48), which has no missing parents
TaskSchedulerImpl:54 - Cancelling stage 46
DAGScheduler:54 - ResultStage 46 (runJob at PythonRDD.scala:141) failed in Unknown s due to Job aborted due to stage failure: Task creation failed: java.io.FileNotFoundException: File file:/home/jovyan/ampath/c1949961-34bc-4f6b-b846-247fb8f73ea4/rdd-60/part-00000 does not exist
java.io.FileNotFoundException: File file:/home/jovyan/ampath/c1949961-34bc-4f6b-b846-247fb8f73ea4/rdd-60/part-00000 does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)

Вот определение метода transform_msg:

    def transform_msg(msg_stream):
    transformed_stream = msg_stream.transform(lambda rdd: transform(rdd))

    def transform(rdd):
        if not rdd.isEmpty():
            msg_df = rdd.toDF()
            transformed_msg = group_obs(msg_df).rdd
            return transformed_msg
    return transformed_stream

def group_msgs(obs_df):
    cols = [f.when(~f.col(x).isin("null"), f.col(x)).alias(x) for x in obs_df.columns if x != "obs_group_id"]

    obs = obs_df.select(*cols,"obs_group_id")

    grouped_by_obsgroup = filtered_obs_with_value\
                                    .withColumn("strObs", f.struct(f.col("obs_id"), f.col("obs_voided"),
                                    f.col("concept_id"), f.col("value"), f.col("value_type"), f.col("obs_date").alias("obs_datetime")))\
                                    .groupBy("obs_group_id", "encounter_id") \
                                    .agg(f.struct(f.col("obs_group_id"),f.collect_list("strObs").alias("obs")).alias("obs"))

    grouped_by_encounter = grouped_by_obsgroup \
        .groupBy("encounter_id")\
        .agg(f.to_json(f.collect_list(f.col("obs")).alias("obs")))

    return grouped_by_encounter

Моя цель - посмотреть, что я могу использовать для обогащения своего потока.Есть лучший способ сделать это?Любая помощь будет принята с благодарностью.

...