Страновая контрольная точка Spark в случае множественной вставки в пакете странное поведение - PullRequest
0 голосов
/ 05 марта 2020

Я получил много сообщений о ведении контрольной точки в Spark, но у меня возникла проблема с поворотом и мозговой атакой, на которую я еще не получил ни одной надлежащей документации или ответа.

Итак, согласно приведенному ниже фрагменту -

        dataframe
        .writeStream
        .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
          batchDF.persist()

          batchDF
            .select(trackerdataMap.keySet.toSeq.map(col): _*)
            .write
            .mode("Append")
            .mongo(WriteConfig("mongo.dbname", "mongo.tablename1"))

          batchDF
            .select(trackerDataRawMap.keySet.toSeq.map(col): _*)
            .write
            .mode("Append")
            .mongo(WriteConfig("mongo.dbname", "mongo.tablename2"))

          batchDF.unpersist()

        }
        .outputMode(OutputMode.Append())
        .option("checkpointLocation", checkpointDirectory)

Я записываю свой поток данных в 2 таблицы. Данные не совпадают в обеих таблицах. для каждой строки информационного кадра - некоторые столбцы go в "table1", в то время как другие будут go в "table2", и у нас также есть некоторые общие столбцы в обеих таблицах.

например -

1 Row of 
Dataframe => Row(c1,c2,c3,c4,c5,c6)
table1    => Row(c1,c3,c4)
table2    => Row(c1,c2,c5,c6)

Теперь приложение получило ошибку в любой момент времени. Итак, разберитесь с вариантом использования в соответствии с таблицей ниже.

               table1 table2 checkpoint
kafka_offset     10     7       11
  • Table1 вставлен до kafka_offset- 10
  • Table2 вставлен до kafka_offset- 7

Контрольная точка в каталоге говорит 11.

Теперь, когда я перезапущу приложение, каким должно быть самое первое использованное смещение?

Согласно текущему пониманию, оно должно начинаться с 11-го смещения.

Но когда приложение было запущено снова, и данные были получены из 8-го смещения, и я обнаружил повторяющиеся записи в таблице 1 для 3 записей смещения - 8-е, 9-е и 10-е смещение дублировались.

Теперь вопрос Возникает !!!!

  • В случае множественной вставки, Checkpoint поддерживается на каком основании.

    • Вставка в соответствии с 1-й вставкой таблицы
    • Вставка в соответствии с 2-й вставкой таблицы
    • Случайные или нет какие-либо конкретные c критерии
  • Как это возможно?

  • Есть ли какое-то отставание в понимании?

  • Если это правильное поведение, то что цель поддержания контрольной точки?

  • Как приложение Spark узнает, что table2 отстает с 3 записями, даже если контрольная точка синхронизируется c с таблицей1?

У меня есть правильные доказательства, файлы и контент. Любая информация, необходимая для понимания моего случая, пожалуйста, спросите и bru sh мое понимание этого странного поведения

...