• 1000 для сохранения в таблицах DeltaLake. Каждый источник данных - это просто папка в s3 с файлами avro. У каждого источника данных своя схема. Каждый источник данных должен сохраняться в собственной таблице DeltaLake. Требуется небольшое преобразование, кроме avro -> delta, только обогащение некоторыми дополнительными полями, полученными из имени файла. Новые файлы добавляются с умеренной скоростью, от одного раза в минуту до одного раза в день, в зависимости от источника данных. У меня есть уведомление kafka, когда появляются новые данные, с описанием того, какие данные и путь к файлу s3.
Предположим, есть два источника данных - A и B. A - это файлы s3: // bucket / A / *, B - s3: // ведро / B / *. Всякий раз, когда добавляются новые файлы, у меня появляется сообщение кафки с полезной нагрузкой {'источник данных': 'A', имя файла: 's3: // bucket / A / file1', ... другие поля}. Файлы должны go в дельта-таблицу s3: // delta / A /, B - s3: // delta / B /
Как я могу принять их все в одном приложении Spark с минимальной задержкой? Поскольку необходимые данные постоянно поступают, звучат как потоковая передача. Но при потоковой передаче искр нужно заранее определить схему потока, и у меня есть разные источники с другой схемой, которые заранее не известны.
Создание выделенного приложения Spark для каждого источника данных не вариант - существует более 100 источников данных с очень поступают небольшие файлы. Наличие 100+ искровых приложений - пустая трата денег. Все должно быть загружено с использованием одного кластера умеренного размера.
Единственная идея, которая у меня есть сейчас: в процессе драйвера запустить нормального потребителя kafka, для каждой записи прочитать фрейм данных, обогатить дополнительными полями и сохранить его дельта стол. Больше параллелизма - потребляйте несколько сообщений и запускайте их во фьючерсах, чтобы несколько заданий выполнялись одновременно. Какой-то псевдокод в процессе драйвера:
val consumer = KafkaConsumer(...)
consumer.foreach{record =>
val ds = record.datasource
val file = record.filename
val df = spark.read.format(avro).load(file)
.withColumn('id', record.id)
val dest = s"s3://delta/${record.datasourceName}"
df.write.format('delta').save(dest)
consumer.commit(offset from record)
}
Звучит хорошо (и Po C показывает, что он работает), но мне интересно, есть ли другие варианты? Любые другие идеи приветствуются. Spark работает на платформе DataBricks.