Ограничение S3-разделов, проверяемых при каждом выполнении запроса структурированной потоковой передачи. - PullRequest
0 голосов
/ 12 мая 2018

В настоящее время мы загружаем кучу файлов Avro на S3, которые разбиты по часам загрузки, и мы надеемся ETL их в Parquet и перераспределить через customerId события.

Мы установили решение с использованием искровой структурированной потоковой передачи, и оно кажется семантически правильным. В нашем коде есть несколько запросов, которые выглядят как

Dataset<Row> sourceStream = sparkSession.readStream()
  .format("com.databricks.spark.avro")
  .load("/avro/input/path");

sourceStream
  .drop("ingestion_hour")
  .withColumn("c_id", new Column("customer_id"))
  .repartition(new Column("c_id"))
  .writeStream()
  .trigger(Trigger.Once())
  .outputMode(OutputMode.Append())
  .format("parquet")
  .option("checkpointLocation", "/checkpoint/path/")
  .partitionBy("c_id")
  .start("/parquet/output/path");

Однако, по-видимому, существует проблема, когда число файлов с префиксом /avro/input/path постоянно увеличивается, что приводит к тому, что при каждом выполнении ETL перечисление файлов в S3 занимает все больше и больше времени для вычисления новых файлов. .

Я попытался разделить отдельный потоковый запрос на несколько потоковых запросов, которые потребляют каждый час, например, /avro/input/path/ingestion_hour=yyyy-MM-dd-HH и вывод в тот же приемник /parquet/output/path с путем контрольной точки /checkpoint/path/yyyy-MM-dd-HH. Затем я могу ограничить выполнение задания только запросами за прошедшие, скажем, 12 часов. Тем не менее, задание не похоже на разделение приемника между несколькими запросами, это правильная вещь, так как большинство записей не отображаются в выводе.

Мне интересно, есть ли какой-нибудь способ заставить этот подход работать или есть другой подход для ограничения числа файлов в списке?

...