источник потокового структурированного потокового чтения, считываемый с определенного раздела - PullRequest
1 голос
/ 19 сентября 2019

У меня есть папка на HDFS, как показано ниже, с файлами ORC:

/path/to/my_folder

Она содержит разделы:

/path/to/my_folder/dt=20190101
/path/to/my_folder/dt=20190103
/path/to/my_folder/dt=20190103
...

Теперь мне нужно обработать данные здесь с помощью потоковой передачи.A spark.readStream.format("orc").load("/path/to/my_folder") прекрасно работает.

Однако я не хочу обрабатывать всю таблицу, а скорее начинать только с определенного раздела, аналогичного определенному смещению kafka.

Как это можнобыть реализованным?Т.е. как я могу указать начальное состояние, откуда читать.

Исходное смещение источника потокового структурированного файла Spark утверждает, что такой функции нет.Их предложение использовать: latestFirst нежелательно для моего варианта использования, так как я не ставлю целью создание постоянно работающего потокового приложения, а скорее использую Trigger.Once как пакетное задание с хорошей потоковой семантикой дублирования сокращения иобработка данных, поступающих с опозданием

Если это недоступно, какой будет подходящий обходной путь?

edit

Запуск потока предупреждений с параметром ("latestFirst ", true) и опция (" maxFilesPerTrigger "," 1 ") с контрольной точкой, фиктивным приемником и огромным временем обработки.Таким образом, поток прогрева сохранит последнюю временную метку файла в контрольной точке.

Запустите реальный поток с параметром ("maxFileAge", "0"), реальный приемник, используя то же местоположение контрольной точки.В этом случае поток будет обрабатывать только новые доступные файлы.https://stackoverflow.com/a/51399134/2587904

Основываясь на этой идее, давайте рассмотрим пример:

# in bash
rm -rf data
mkdir -p data/dt=20190101
echo "1,1,1" >> data/dt=20190101/1.csv
echo "1,1,2" >> data/dt=20190101/2.csv
mkdir data/dt=20190102
echo "1,2,1" >> data/dt=20190102/1.csv
echo "1,2,2" >> data/dt=20190102/2.csv
mkdir data/dt=20190103
echo "1,3,1" >> data/dt=20190103/1.csv
echo "1,3,2" >> data/dt=20190103/2.csv
mkdir data/dt=20190104
echo "1,4,1" >> data/dt=20190104/1.csv
echo "1,4,2" >> data/dt=20190104/2.csv

spark-shell --conf spark.sql.streaming.schemaInference=true

// from now on in scala
val df = spark.readStream.csv("data")
df.printSchema
val query = df.writeStream.format("console").start
query.stop

// cleanup the data and start from scratch. 
// this time instead of outputting to the console, write to file
val query = df.writeStream.format("csv")
    .option("path", "output")
    .option("checkpointLocation", "checkpoint")
val started = query.start


// in bash
# generate new data
mkdir data/dt=20190105
echo "1,5,1" >> data/dt=20190105/1.csv
echo "1,5,2" >> data/dt=20190105/2.csv
echo "1,4,3" >> data/dt=20190104/3.csv

// in scala
started.stop
// cleanup the output, start later on with custom checkpoint
//bash: rm -rf output/*
val started = query.start

// bash
echo "1,4,3" >> data/dt=20190104/4.csv
started.stop

// *****************
//bash: rm -rf output/*

Все работает как задумано.Операция определяет, где контрольная точка помечает последний обработанный файл.Как определение контрольной точки может быть сгенерировано руками, например, все файлы в dt=20190101 и dt=20190102 были обработаны, и более поздние данные там больше не допускаются, и обработка должна продолжаться со всеми файлами, начиная с dt=20190103 и далее?

Я вижу, что искра генерирует:

  • коммитов
  • метаданных
  • смещений
  • источников
  • _spark-метаданные

файлы и папки.До сих пор я знаю только, что _spark-metadata можно игнорировать, чтобы установить начальное состояние / контрольную точку.

Но еще не выяснили (из других файлов), какие минимальные значения должны присутствовать, поэтому обработка начинается сdt=20190103 и далее.

edit 2

Теперь я знаю, что:

  • commitits / 0 должен присутствовать
  • метаданные должныприсутствовать
  • должны присутствовать смещения

, но могут быть очень общими:

v1
{"batchWatermarkMs":0,"batchTimestampMs":0,"conf":{"spark.sql.shuffle.partitions":"200"}}
{"logOffset":0}

Когда я пытался удалить один из уже обработанных и зафиксированных файловначиная с sources/0/0, запрос все еще выполняется, но: обрабатывается не только новые данные, большие, чем существующие зафиксированные данные, но и любые данные , в частности, файлы, которые я только что удалил из журнала.

Как я могу изменить это поведение, чтобы обрабатывать только данные, более свежие, чем исходное состояние ?

редактировать 3

Документы (https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-FileStreamSource.html), нотакже javadocs;) list getOffset

Максимальное смещение (getOffset) рассчитываетсяВыделил все файлы в пути, кроме файлов, начинающихся с _ (подчеркивание).

Звучит интересно, но до сих пор я не понял, как использовать его для решения моей проблемы.

Существует ли более простой способ достижения желаемой функциональности помимо создания пользовательской (копии) FileSource?

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L237

maxFileAge также звучит интересно.

Я начал работать с нестандартным источником файлового потока.Тем не менее, не в состоянии правильно создать его экземпляр.https://gist.github.com/geoHeil/6c0c51e43469ace71550b426cfcce1c1 При вызове:

val df = spark.readStream.format("org.apache.spark.sql.execution.streaming.StatefulFileStreamSource")
    .option("partitionState", "/path/to/data/dt=20190101")
    .load("data")

Операция завершается неудачно с:

InstantiationException: org.apache.spark.sql.execution.streaming.StatefulFileStreamSource
  at java.lang.Class.newInstance(Class.java:427)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:196)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:88)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:88)
  at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
  ... 53 elided
Caused by: java.lang.NoSuchMethodException: org.apache.spark.sql.execution.streaming.StatefulFileStreamSource.<init>()
  at java.lang.Class.getConstructor0(Class.java:3082)
  at java.lang.Class.newInstance(Class.java:412)
  ... 59 more

Несмотря на то, что это в основном копия исходного источника.Что отличается?Почему конструктор не найден из https://github.com/apache/spark/blob/v2.2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L196? Но он отлично работает для https://github.com/apache/spark/blob/v2.2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L42

Четный:

touch -t 201801181205.09 data/dt=20190101/1.csv
touch -t 201801181205.09 data/dt=20190101/2.csv
  val df = spark.readStream
      .option("maxFileAge", "2d")
    .csv("data")

возвращает весь набор данных и не может фильтровать большинствоk текущих дней.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...