CheckpointWriter сохраняет данные в HDFS для каждого пакета - PullRequest
0 голосов
/ 28 августа 2018

Я отлаживаю проблему для контрольной точки в приложении Apache spark, поэтому я читаю журналы Spark для класса "CheckpointWriter".

Я передаю данные из Kafka и помещаю эти данные в состояние с помощью mapWithState, поэтому контрольная точка для HDFS работает по умолчанию.

Я знаю, что mapWithState проверяет данные в HDFS после каждых 10 пакетов (мой пакетный интервал составляет 10 секунд), то есть через каждые 100 секунд.

 18/08/28 12:09:52 INFO CheckpointWriter: Checkpoint for time 1535472590000 ms saved to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472590000', took 22878 bytes and 195 ms
            18/08/28 12:09:53 INFO CheckpointWriter: Submitted checkpoint of time 1535472590000 ms to writer queue
            18/08/28 12:09:53 INFO CheckpointWriter: Saving checkpoint for time 1535472590000 ms to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472590000'
            18/08/28 12:09:53 INFO CheckpointWriter: Deleting hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535469380000
            18/08/28 12:09:53 INFO CheckpointWriter: Checkpoint for time 1535472590000 ms saved to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472590000', took 22876 bytes and 176 ms
            18/08/28 12:10:01 INFO CheckpointWriter: Submitted checkpoint of time 1535472600000 ms to writer queue
            18/08/28 12:10:01 INFO CheckpointWriter: Saving checkpoint for time 1535472600000 ms to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472600000'
            18/08/28 12:10:01 INFO CheckpointWriter: Deleting hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535469390000.bk
            18/08/28 12:10:01 INFO CheckpointWriter: Checkpoint for time 1535472600000 ms saved to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472600000', took 22942 bytes and 167 ms
            18/08/28 12:10:02 INFO CheckpointWriter: Saving checkpoint for time 1535472600000 ms to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472600000'
            18/08/28 12:10:02 INFO CheckpointWriter: Submitted checkpoint of time 1535472600000 ms to writer queue
            18/08/28 12:10:02 INFO CheckpointWriter: Deleting hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535469390000
            18/08/28 12:10:02 INFO CheckpointWriter: Checkpoint for time 1535472600000 ms saved to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472600000', took 22938 bytes and 178 ms
            18/08/28 12:10:12 INFO CheckpointWriter: Submitted checkpoint of time 1535472610000 ms to writer queue
            18/08/28 12:10:12 INFO CheckpointWriter: Saving checkpoint for time 1535472610000 ms to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472610000'
            18/08/28 12:10:12 INFO CheckpointWriter: Deleting hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535469400000.bk
            18/08/28 12:10:12 INFO CheckpointWriter: Checkpoint for time 1535472610000 ms saved to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472610000', took 23136 bytes and 212 ms
            18/08/28 12:10:12 INFO CheckpointWriter: Submitted checkpoint of time 1535472610000 ms to writer queue
            18/08/28 12:10:12 INFO CheckpointWriter: Saving checkpoint for time 1535472610000 ms to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472610000'
            18/08/28 12:10:13 INFO CheckpointWriter: Deleting hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535469400000
            18/08/28 12:10:13 INFO CheckpointWriter: Checkpoint for time 1535472610000 ms saved to file 'hdfs://mycluster/user/user1/sparkCheckpointData/checkpoint-1535472610000', took 23136 bytes and 170 ms

Мои наблюдения:

  • Но когда я смотрю на журнал, я вижу журналы, они показывают, что данные проверяются в HDFS при каждом пакете, и сразу после этого удаляется предыдущая контрольная точка.

  • Для каждого пакета он сохраняет и удаляет данные в HDFS дважды. Я не уверен, что неправильно читаю логи.

Также хотелось бы отметить, что у меня есть два потока, работающих в одном приложении. Я передаю потоковое видео с Kafka в первом потоке и потоковое вещание из EMS (с помощью настраиваемого получателя) во втором потоке. Данные из обоих обновляются в состоянии с помощью mapWithState

Может кто-нибудь пролить свет?

...