Я пытаюсь построить конвейер etl, используя pyspark и kafka.Мне нужно сохранить потоки для будущих операций над ними.Я попытался выполнить потоковую передачу с отслеживанием состояния с помощью updateStateByKey, и она работает в течение короткого времени, прежде чем Spark отключает защиту от rdds (из контрольной точки) и пытается снова получить к ним доступ, и приложение вылетает с исключением FileNotFound.Ниже приведен мой код:
sc = SparkContext(conf=spark_conf)
sc.setLogLevel("INFO")
spark = SparkSession(sparkContext=sc)
ssc = StreamingContext(sc, ssc_config['batchDuration'])
config = getConfig()['kafka']
kafka_stream = KafkaStream.create_source(ssc, config, "mytopic")
new_msg = get_new_msg_stream(kafka_stream)
transformed_msg = transform_msg(new_msg).updateStateByKey(updateState)
ssc.checkpoint("./")
ssc.start()
ssc.awaitTermination()
def updateState(new_state, old_state):
if len(new_state) > 0:
return new_state
return old_state
Я не понимаю, почему это происходит или если потоковая передача с отслеживанием состояния была хорошей идеей для моего варианта использования, потому что мне нужно поддерживать состояние навсегда.Вот трассировка стека драйверов:
JobScheduler:54 - Finished job streaming job 1524592150000 ms.0 from job set of time 1524592150000 ms
PythonRDD:54 - Removing RDD 60 from persistence list
BlockManager:54 - Removing RDD 60
CheckpointWriter:54 - Submitted checkpoint of time 1524592150000 ms to writer queue
CheckpointWriter:54 - Saving checkpoint for time 1524592150000 ms to file 'file:/home/jovyan/ampath/checkpoint-1524592195000'
SparkContext:54 - Starting job: runJob at PythonRDD.scala:141
DAGScheduler:54 - Got job 38 (runJob at PythonRDD.scala:141) with 1 output partitions
DAGScheduler:54 - Submitting ResultStage 46 (PythonRDD[138] at RDD at PythonRDD.scala:48), which has no missing parents
TaskSchedulerImpl:54 - Cancelling stage 46
DAGScheduler:54 - ResultStage 46 (runJob at PythonRDD.scala:141) failed in Unknown s due to Job aborted due to stage failure: Task creation failed: java.io.FileNotFoundException: File file:/home/jovyan/ampath/c1949961-34bc-4f6b-b846-247fb8f73ea4/rdd-60/part-00000 does not exist
java.io.FileNotFoundException: File file:/home/jovyan/ampath/c1949961-34bc-4f6b-b846-247fb8f73ea4/rdd-60/part-00000 does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
Вот определение метода transform_msg:
def transform_msg(msg_stream):
transformed_stream = msg_stream.transform(lambda rdd: transform(rdd))
def transform(rdd):
if not rdd.isEmpty():
msg_df = rdd.toDF()
transformed_msg = group_obs(msg_df).rdd
return transformed_msg
return transformed_stream
def group_msgs(obs_df):
cols = [f.when(~f.col(x).isin("null"), f.col(x)).alias(x) for x in obs_df.columns if x != "obs_group_id"]
obs = obs_df.select(*cols,"obs_group_id")
grouped_by_obsgroup = filtered_obs_with_value\
.withColumn("strObs", f.struct(f.col("obs_id"), f.col("obs_voided"),
f.col("concept_id"), f.col("value"), f.col("value_type"), f.col("obs_date").alias("obs_datetime")))\
.groupBy("obs_group_id", "encounter_id") \
.agg(f.struct(f.col("obs_group_id"),f.collect_list("strObs").alias("obs")).alias("obs"))
grouped_by_encounter = grouped_by_obsgroup \
.groupBy("encounter_id")\
.agg(f.to_json(f.collect_list(f.col("obs")).alias("obs")))
return grouped_by_encounter
Моя цель - посмотреть, что я могу использовать для обогащения своего потока.Есть лучший способ сделать это?Любая помощь будет принята с благодарностью.