Я разделил данные в HDFS. В какой-то момент я решил обновить его. Алгоритм:
- Чтение новых данных из темы кафки.
- Узнайте имена разделов новых данных.
- Загрузка данных из разделов с этими именами в HDFS.
- Объединить данные HDFS с новыми данными.
- Перезаписать разделы, которые уже есть на диске.
Проблема в том, что если в новых данных есть разделы, которых еще нет на диске. В этом случае они не пишутся. https://stackoverflow.com/a/49691528/10681828
На картинке выше описана ситуация. Давайте подумаем о левом диске как о разделах, которые уже есть в HDFS, а о правом диске - как о разделах, которые мы только что получили от Kafka.
Некоторые разделы правого диска будут пересекаться с уже существующими, другие - нет. И этот код:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
dataFrame
.write
.mode(SaveMode.Overwrite)
.partitionBy("date", "key")
.option("header", "true")
.format(format)
.save(path)
не может записать синюю часть изображения на диск.
Итак, как мне решить эту проблему? Пожалуйста, предоставьте код. Я ищу что-то совершенное.
Пример для тех, кто не понимает:
Предположим, у нас есть эти данные в HDFS:
- Раздел A имеет данные "1"
- Раздел B содержит данные "1"
Теперь мы получаем эти новые данные:
- Раздел B имеет данные "2"
- PartitionC имеет данные "1"
Итак, разделы A и B находятся в HDFS, а разделы B и C - новые, и, поскольку B находится в HDFS, мы обновляем его. И я хочу, чтобы C был написан. Таким образом, конечный результат должен выглядеть следующим образом:
- Раздел A имеет данные "1"
- Раздел B содержит данные "2"
- PartitionC имеет данные "1"
Но если я использую код сверху, я получаю это:
- Раздел A имеет данные "1"
- Раздел B содержит данные "2"
Поскольку новая функция overwrite dynamic
из spark 2.3 не может создавать PartitionC.
Обновление : Оказывается, если вы используете вместо этого таблицы улья, это будет работать. Но если вы используете чистую искру, это не ... Так что, я думаю, перезапись улья и перезапись искры работают по-разному.