Spark - поток Кафка в файл, который меняется каждый день? - PullRequest
4 голосов
/ 15 марта 2019

У меня есть поток kafka, я буду обрабатывать в искре.Я хочу записать вывод этого потока в файл.Однако я хочу разделить эти файлы по дням, поэтому каждый день они начнут записывать в новый файл.Можно ли что-то подобное сделать?Я хочу, чтобы это осталось запущенным, и когда наступит новый день, он переключится на запись в новый файл.

val streamInputDf = spark.readStream.format("kafka")
                    .option("kafka.bootstrapservers", "XXXX")
                    .option("subscribe", "XXXX")
                    .load()
val streamSelectDf = streamInputDf.select(...)

streamSelectDf.writeStream.format("parquet)
     .option("path", "xxx")
     ???

Ответы [ 2 ]

2 голосов
/ 15 марта 2019

Добавление раздела из искры может быть сделано с помощью partitionBy, предоставленной в DataFrameWriter для потоковой передачи, или с DataStreamWriter для потоковой передачи данных.


Ниже приведены сигнатуры:

общедоступный DataFrameWriter partitionBy (scala.collection.Seq colNames)

DataStreamWriter partitionBy (scala.collection.Seq colNames) Разделение вывода по заданномустолбцы в файловой системе.

DataStreamWriter partitionBy (String ... colNames) Распределяет выходные данные по заданным столбцам в файловой системе.

Описание: partitionBy public DataStreamWriter partitionBy (String ... colNames) Распределяет выходные данные поданные столбцы в файловой системе.Если указано, выходные данные размещаются в файловой системе аналогично схеме разбиения Hive.Например, когда мы разбиваем набор данных по годам, а затем по месяцам, макет каталога будет выглядеть следующим образом:

- year=2016/month=01/ 
- year=2016/month=02/

Секционирование - это один из наиболее широко используемых методов оптимизации физического макета данных.Он обеспечивает грубый индекс для пропуска ненужных чтений данных, когда запросы имеют предикаты для разделенных столбцов.Чтобы разделение работало хорошо, количество отдельных значений в каждом столбце обычно должно быть меньше десятков тысяч.

Параметры: colNames - (недокументированные) Возвращает: (недокументированное) Так как: 2.0.0

, поэтому, если вы хотите разбить данные по годам и месяцам, спарк сохранит данные в папку, например:

year=2019/month=01/05
year=2019/month=02/05

Опция 1 (прямая запись): Вы упомянули паркет - вы можете использовать сохранение как формат паркета с:

df.write.partitionBy('year', 'month','day').format("parquet").save(path)

Опция 2 (вставить в улей, используя тот же partitionBy):

Вы также можете вставить в таблицу улья, как:

df.write.partitionBy('year', 'month', 'day').insertInto(String tableName)

Получение всех разделов улья:

Spark sql основан на языке запросов улья, поэтому вы можете использовать SHOW PARTITIONS

, чтобы получить список разделов в конкретной таблице.

sparkSession.sql("SHOW PARTITIONS partitionedHiveParquetTable")

Вывод: Я бы предложил вариант 2... так как Advantage позже, вы можете запрашивать данные на основе раздела (иначе запрос необработанных данных, чтобы узнать, что у вас естьполучен), а базовый файл может быть паркетным или орк.

Примечание:

Просто убедитесь, что у вас есть .enableHiveSupport() при создании сеансас помощью SparkSessionBuilder, а также убедитесь, что вы правильно настроили hive-conf.xml и т. д.

1 голос
/ 15 марта 2019

Исходя из этого ответа spark должен иметь возможность записи в папку на основе года, месяца и дня, что, по-видимому, именно то, что вы ищете.Не пробовал это в потоковой передаче, но, надеюсь, этот пример выведет вас на правильный путь:

df.write.partitionBy("year", "month", "day").format("parquet").save(outPath)

Если нет, вы можете добавить переменную filepath на основе current_date()

...