Я отлаживаю проблему для контрольной точки в приложении Apache spark, поэтому я читаю журналы Spark для класса "CheckpointWriter".
Я передаю данные из Kafka и помещаю эти данные в состояние с помощью mapWithState, поэтому контрольная точка для HDFS работает по умолчанию.
Я знаю, что mapWithState проверяет данные в HDFS после каждых 10 пакетов (мой пакетный интервал составляет 10 секунд), то есть через каждые 100 секунд.
18/08/28 12:09:52 INFO CheckpointWriter: Checkpoint for time 1535472590000 ms saved to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472590000', took 22878 bytes and 195 ms
18/08/28 12:09:53 INFO CheckpointWriter: Submitted checkpoint of time 1535472590000 ms to writer queue
18/08/28 12:09:53 INFO CheckpointWriter: Saving checkpoint for time 1535472590000 ms to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472590000'
18/08/28 12:09:53 INFO CheckpointWriter: Deleting hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535469380000
18/08/28 12:09:53 INFO CheckpointWriter: Checkpoint for time 1535472590000 ms saved to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472590000', took 22876 bytes and 176 ms
18/08/28 12:10:01 INFO CheckpointWriter: Submitted checkpoint of time 1535472600000 ms to writer queue
18/08/28 12:10:01 INFO CheckpointWriter: Saving checkpoint for time 1535472600000 ms to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472600000'
18/08/28 12:10:01 INFO CheckpointWriter: Deleting hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535469390000.bk
18/08/28 12:10:01 INFO CheckpointWriter: Checkpoint for time 1535472600000 ms saved to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472600000', took 22942 bytes and 167 ms
18/08/28 12:10:02 INFO CheckpointWriter: Saving checkpoint for time 1535472600000 ms to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472600000'
18/08/28 12:10:02 INFO CheckpointWriter: Submitted checkpoint of time 1535472600000 ms to writer queue
18/08/28 12:10:02 INFO CheckpointWriter: Deleting hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535469390000
18/08/28 12:10:02 INFO CheckpointWriter: Checkpoint for time 1535472600000 ms saved to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472600000', took 22938 bytes and 178 ms
18/08/28 12:10:12 INFO CheckpointWriter: Submitted checkpoint of time 1535472610000 ms to writer queue
18/08/28 12:10:12 INFO CheckpointWriter: Saving checkpoint for time 1535472610000 ms to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472610000'
18/08/28 12:10:12 INFO CheckpointWriter: Deleting hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535469400000.bk
18/08/28 12:10:12 INFO CheckpointWriter: Checkpoint for time 1535472610000 ms saved to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472610000', took 23136 bytes and 212 ms
18/08/28 12:10:12 INFO CheckpointWriter: Submitted checkpoint of time 1535472610000 ms to writer queue
18/08/28 12:10:12 INFO CheckpointWriter: Saving checkpoint for time 1535472610000 ms to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472610000'
18/08/28 12:10:13 INFO CheckpointWriter: Deleting hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535469400000
18/08/28 12:10:13 INFO CheckpointWriter: Checkpoint for time 1535472610000 ms saved to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472610000', took 23136 bytes and 170 ms
Мои наблюдения:
Но когда я смотрю на журнал, я вижу журналы, они показывают, что данные проверяются в HDFS при каждом пакете, и сразу после этого удаляется предыдущая контрольная точка.
Для каждого пакета он сохраняет и удаляет данные в HDFS дважды. Я не уверен, что неправильно читаю логи.
Также хотелось бы отметить, что у меня есть два потока, работающих в одном приложении.
Я передаю потоковое видео с Kafka в первом потоке и потоковое вещание из EMS (с помощью настраиваемого получателя) во втором потоке.
Данные из обоих обновляются в состоянии с помощью mapWithState
Может кто-нибудь пролить свет?