Есть по крайней мере несколько способов сделать это, переключившись с map
на связанное преобразование:
mapPartitions
mapPartitions
дает нам доступ к итератору для каждого раздела, поэтому мы можем просто делать вид, что в нем нет элементов, если время ожидания истекло:
val data = sc.parallelize(1 to 100)
val timeout = 10000
val start = System.currentTimeMillis
data.repartition(10).mapPartitions { iter =>
if (System.currentTimeMillis - start > timeout) Iterator.empty
else iter.map(x => { Thread.sleep(500); x + 1 })
}.count
В зависимости от вашей среды вам может потребоваться настроить таймауты в этом примере оболочки с искрой, но он должен давать различное количество результатов в зависимости от того, когда именно вы запустите преобразование относительно установки start
.
Обратите внимание, что должно быть значительно большее количество разделов, чем общее количество ядер-исполнителей, в противном случае все разделы начнутся немедленно, и пропустить нечего. Следовательно, для этого примера мы явно переделаем данные перед тем, как запустить mapPartitions
. Это может потребоваться или не потребоваться в зависимости от размера ваших данных и количества выделенных ядер.
flatMap
Более тонкий подход заключается в использовании flatMap
, который позволяет нам обрабатывать или пропускать каждую отдельную строку условно, через функцию, которая возвращает Option
(и просто возвращает None
, если время ожидания истекло);
// setup as before
data.flatMap{ x => if (System.currentTimeMillis - start > timeout) None
else Some({Thread.sleep(500); x + 1}) }.count
Этот подход не требует разбиения, но он сканирует все оставшиеся разделы даже после истечения времени ожидания (но не выполняет дорогостоящую обработку ни одной из строк).