Я получил много сообщений о ведении контрольной точки в Spark, но у меня возникла проблема с поворотом и мозговой атакой, на которую я еще не получил ни одной надлежащей документации или ответа.
Итак, согласно приведенному ниже фрагменту -
dataframe
.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF
.select(trackerdataMap.keySet.toSeq.map(col): _*)
.write
.mode("Append")
.mongo(WriteConfig("mongo.dbname", "mongo.tablename1"))
batchDF
.select(trackerDataRawMap.keySet.toSeq.map(col): _*)
.write
.mode("Append")
.mongo(WriteConfig("mongo.dbname", "mongo.tablename2"))
batchDF.unpersist()
}
.outputMode(OutputMode.Append())
.option("checkpointLocation", checkpointDirectory)
Я записываю свой поток данных в 2 таблицы. Данные не совпадают в обеих таблицах. для каждой строки информационного кадра - некоторые столбцы go в "table1", в то время как другие будут go в "table2", и у нас также есть некоторые общие столбцы в обеих таблицах.
например -
1 Row of
Dataframe => Row(c1,c2,c3,c4,c5,c6)
table1 => Row(c1,c3,c4)
table2 => Row(c1,c2,c5,c6)
Теперь приложение получило ошибку в любой момент времени. Итак, разберитесь с вариантом использования в соответствии с таблицей ниже.
table1 table2 checkpoint
kafka_offset 10 7 11
- Table1 вставлен до kafka_offset- 10
- Table2 вставлен до kafka_offset- 7
Контрольная точка в каталоге говорит 11.
Теперь, когда я перезапущу приложение, каким должно быть самое первое использованное смещение?
Согласно текущему пониманию, оно должно начинаться с 11-го смещения.
Но когда приложение было запущено снова, и данные были получены из 8-го смещения, и я обнаружил повторяющиеся записи в таблице 1 для 3 записей смещения - 8-е, 9-е и 10-е смещение дублировались.
Теперь вопрос Возникает !!!!
В случае множественной вставки, Checkpoint поддерживается на каком основании.
- Вставка в соответствии с 1-й вставкой таблицы
- Вставка в соответствии с 2-й вставкой таблицы
- Случайные или нет какие-либо конкретные c критерии
Как это возможно?
Есть ли какое-то отставание в понимании?
Если это правильное поведение, то что цель поддержания контрольной точки?
Как приложение Spark узнает, что table2 отстает с 3 записями, даже если контрольная точка синхронизируется c с таблицей1?
У меня есть правильные доказательства, файлы и контент. Любая информация, необходимая для понимания моего случая, пожалуйста, спросите и bru sh мое понимание этого странного поведения