Как получить разные кадры данных Spark за одно задание искры - PullRequest
0 голосов
/ 04 мая 2020
• 1000 для сохранения в таблицах DeltaLake. Каждый источник данных - это просто папка в s3 с файлами avro. У каждого источника данных своя схема. Каждый источник данных должен сохраняться в собственной таблице DeltaLake. Требуется небольшое преобразование, кроме avro -> delta, только обогащение некоторыми дополнительными полями, полученными из имени файла. Новые файлы добавляются с умеренной скоростью, от одного раза в минуту до одного раза в день, в зависимости от источника данных. У меня есть уведомление kafka, когда появляются новые данные, с описанием того, какие данные и путь к файлу s3.

Предположим, есть два источника данных - A и B. A - это файлы s3: // bucket / A / *, B - s3: // ведро / B / *. Всякий раз, когда добавляются новые файлы, у меня появляется сообщение кафки с полезной нагрузкой {'источник данных': 'A', имя файла: 's3: // bucket / A / file1', ... другие поля}. Файлы должны go в дельта-таблицу s3: // delta / A /, B - s3: // delta / B /

Как я могу принять их все в одном приложении Spark с минимальной задержкой? Поскольку необходимые данные постоянно поступают, звучат как потоковая передача. Но при потоковой передаче искр нужно заранее определить схему потока, и у меня есть разные источники с другой схемой, которые заранее не известны.

Создание выделенного приложения Spark для каждого источника данных не вариант - существует более 100 источников данных с очень поступают небольшие файлы. Наличие 100+ искровых приложений - пустая трата денег. Все должно быть загружено с использованием одного кластера умеренного размера.

Единственная идея, которая у меня есть сейчас: в процессе драйвера запустить нормального потребителя kafka, для каждой записи прочитать фрейм данных, обогатить дополнительными полями и сохранить его дельта стол. Больше параллелизма - потребляйте несколько сообщений и запускайте их во фьючерсах, чтобы несколько заданий выполнялись одновременно. Какой-то псевдокод в процессе драйвера:

val consumer = KafkaConsumer(...)
consumer.foreach{record =>
    val ds = record.datasource
    val file = record.filename
    val df = spark.read.format(avro).load(file)
        .withColumn('id', record.id)
    val dest = s"s3://delta/${record.datasourceName}"
    df.write.format('delta').save(dest)
    consumer.commit(offset from record)
}

Звучит хорошо (и Po C показывает, что он работает), но мне интересно, есть ли другие варианты? Любые другие идеи приветствуются. Spark работает на платформе DataBricks.

1 Ответ

0 голосов
/ 05 мая 2020

Spark не ограничивает вас наличием приложения Spark для каждого приема источника данных, вы можете сгруппировать источники данных в пару приложений Spark или go с одним приложением Spark для всех источников данных, что является возможным подходом, если искра приложение имеет достаточно ресурсов для приема и обработки всего источника данных.

Вы можете сделать что-то вроде:

object StreamingJobs extends SparkApp {

  // consume from Kafka Topic 1
  StreamProcess_1.runStream(spark)

  // consume from Kafka Topic 2
  StreamProcess_2.runStream(spark)

  //  consume from Kafka Topic n
  StreamProcess_N.runStream(spark)

  // wait until termination
  spark.streams.awaitAnyTermination()

}

и, возможно, еще одно искровое задание для пакетной обработки

object BatchedJobs extends SparkApp {

  // consume from data source 1
  BatchedProcess_1.run(spark)

  // consume from  data source 2
  BatchedProcess_2.run(spark)

  //  consume from  data source n
  BatchedProcess_N.run(spark) 

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...