У меня есть папка на HDFS, как показано ниже, с файлами ORC:
/path/to/my_folder
Она содержит разделы:
/path/to/my_folder/dt=20190101
/path/to/my_folder/dt=20190103
/path/to/my_folder/dt=20190103
...
Теперь мне нужно обработать данные здесь с помощью потоковой передачи.A spark.readStream.format("orc").load("/path/to/my_folder")
прекрасно работает.
Однако я не хочу обрабатывать всю таблицу, а скорее начинать только с определенного раздела, аналогичного определенному смещению kafka.
Как это можнобыть реализованным?Т.е. как я могу указать начальное состояние, откуда читать.
Исходное смещение источника потокового структурированного файла Spark утверждает, что такой функции нет.Их предложение использовать: latestFirst
нежелательно для моего варианта использования, так как я не ставлю целью создание постоянно работающего потокового приложения, а скорее использую Trigger.Once
как пакетное задание с хорошей потоковой семантикой дублирования сокращения иобработка данных, поступающих с опозданием
Если это недоступно, какой будет подходящий обходной путь?
edit
Запуск потока предупреждений с параметром ("latestFirst ", true) и опция (" maxFilesPerTrigger "," 1 ") с контрольной точкой, фиктивным приемником и огромным временем обработки.Таким образом, поток прогрева сохранит последнюю временную метку файла в контрольной точке.
Запустите реальный поток с параметром ("maxFileAge", "0"), реальный приемник, используя то же местоположение контрольной точки.В этом случае поток будет обрабатывать только новые доступные файлы.https://stackoverflow.com/a/51399134/2587904
Основываясь на этой идее, давайте рассмотрим пример:
# in bash
rm -rf data
mkdir -p data/dt=20190101
echo "1,1,1" >> data/dt=20190101/1.csv
echo "1,1,2" >> data/dt=20190101/2.csv
mkdir data/dt=20190102
echo "1,2,1" >> data/dt=20190102/1.csv
echo "1,2,2" >> data/dt=20190102/2.csv
mkdir data/dt=20190103
echo "1,3,1" >> data/dt=20190103/1.csv
echo "1,3,2" >> data/dt=20190103/2.csv
mkdir data/dt=20190104
echo "1,4,1" >> data/dt=20190104/1.csv
echo "1,4,2" >> data/dt=20190104/2.csv
spark-shell --conf spark.sql.streaming.schemaInference=true
// from now on in scala
val df = spark.readStream.csv("data")
df.printSchema
val query = df.writeStream.format("console").start
query.stop
// cleanup the data and start from scratch.
// this time instead of outputting to the console, write to file
val query = df.writeStream.format("csv")
.option("path", "output")
.option("checkpointLocation", "checkpoint")
val started = query.start
// in bash
# generate new data
mkdir data/dt=20190105
echo "1,5,1" >> data/dt=20190105/1.csv
echo "1,5,2" >> data/dt=20190105/2.csv
echo "1,4,3" >> data/dt=20190104/3.csv
// in scala
started.stop
// cleanup the output, start later on with custom checkpoint
//bash: rm -rf output/*
val started = query.start
// bash
echo "1,4,3" >> data/dt=20190104/4.csv
started.stop
// *****************
//bash: rm -rf output/*
Все работает как задумано.Операция определяет, где контрольная точка помечает последний обработанный файл.Как определение контрольной точки может быть сгенерировано руками, например, все файлы в dt=20190101
и dt=20190102
были обработаны, и более поздние данные там больше не допускаются, и обработка должна продолжаться со всеми файлами, начиная с dt=20190103
и далее?
Я вижу, что искра генерирует:
- коммитов
- метаданных
- смещений
- источников
- _spark-метаданные
файлы и папки.До сих пор я знаю только, что _spark-metadata
можно игнорировать, чтобы установить начальное состояние / контрольную точку.
Но еще не выяснили (из других файлов), какие минимальные значения должны присутствовать, поэтому обработка начинается сdt=20190103
и далее.
edit 2
Теперь я знаю, что:
- commitits / 0 должен присутствовать
- метаданные должныприсутствовать
- должны присутствовать смещения
, но могут быть очень общими:
v1
{"batchWatermarkMs":0,"batchTimestampMs":0,"conf":{"spark.sql.shuffle.partitions":"200"}}
{"logOffset":0}
Когда я пытался удалить один из уже обработанных и зафиксированных файловначиная с sources/0/0
, запрос все еще выполняется, но: обрабатывается не только новые данные, большие, чем существующие зафиксированные данные, но и любые данные , в частности, файлы, которые я только что удалил из журнала.
Как я могу изменить это поведение, чтобы обрабатывать только данные, более свежие, чем исходное состояние ?
редактировать 3
Документы (https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-FileStreamSource.html), нотакже javadocs;) list getOffset
Максимальное смещение (getOffset) рассчитываетсяВыделил все файлы в пути, кроме файлов, начинающихся с _ (подчеркивание).
Звучит интересно, но до сих пор я не понял, как использовать его для решения моей проблемы.
Существует ли более простой способ достижения желаемой функциональности помимо создания пользовательской (копии) FileSource?
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L237
maxFileAge
также звучит интересно.
Я начал работать с нестандартным источником файлового потока.Тем не менее, не в состоянии правильно создать его экземпляр.https://gist.github.com/geoHeil/6c0c51e43469ace71550b426cfcce1c1 При вызове:
val df = spark.readStream.format("org.apache.spark.sql.execution.streaming.StatefulFileStreamSource")
.option("partitionState", "/path/to/data/dt=20190101")
.load("data")
Операция завершается неудачно с:
InstantiationException: org.apache.spark.sql.execution.streaming.StatefulFileStreamSource
at java.lang.Class.newInstance(Class.java:427)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:196)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:88)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:88)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
... 53 elided
Caused by: java.lang.NoSuchMethodException: org.apache.spark.sql.execution.streaming.StatefulFileStreamSource.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)
... 59 more
Несмотря на то, что это в основном копия исходного источника.Что отличается?Почему конструктор не найден из https://github.com/apache/spark/blob/v2.2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L196? Но он отлично работает для https://github.com/apache/spark/blob/v2.2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L42
Четный:
touch -t 201801181205.09 data/dt=20190101/1.csv
touch -t 201801181205.09 data/dt=20190101/2.csv
val df = spark.readStream
.option("maxFileAge", "2d")
.csv("data")
возвращает весь набор данных и не может фильтровать большинствоk текущих дней.