Самый быстрый способ найти файлы, измененные за последние 'x' минут - PullRequest
1 голос
/ 02 августа 2020

У меня есть требование найти в каталоге файлы, измененные за последние 10 минут. Каталог постоянно обновляется, и каждый раз в нем будет около 50-60 тысяч файлов. Я использую приведенный ниже код для получения файлов:

import java.io.File
import java.time.Instant

val dir = new File("/path/to/dir") 
val files = dir.listFiles.toList.filter(f => f.getName.matches("some filter"))
files.filter(f => f.isFile && f.exists &&
    Instant.ofEpochMilli(f.lastModified).plus(10, MINUTES).isAfter(Instant.now))
    .toList.sortBy(_.lastModified)

Это занимает около 20-30 минут. Но я хочу получить результат менее чем за 10 минут. Я даже пробовал запустить это в нашем кластере has oop с помощью Spark. Это искровой код:

val sparkConfig = new SparkConf()
    .setAppName("findRecentFiles")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.shuffle.compress", "true")
    .set("spark.rdd.compress", "true")
val sc = new SparkContext(sparkConfig)
val rdd = sc.parallelize(files)
rdd.filter(f => f.isFile && f.exists &&
    Instant.ofEpochMilli(f.lastModified).plus(10, MINUTES).isAfter(Instant.now))
    .collect.toList.sortBy(_.lastModified)

Тем не менее, это занимает столько же времени. И я заметил одну вещь: фильтрация по имени файла выполняется быстро. Но добавление фильтра lastModified замедляет работу. Есть ли лучший способ быстрее получить результаты?

ОБНОВЛЕНИЕ Я обновил конфиги Spark и теперь могу получить результат менее чем за 10 минут. Раньше я запускал банку следующим образом:

spark-submit myJar.jar

Я изменил его на это:

spark-submit --deploy-mode client --queue SomeNonDefaultQueue --executor-memory 16g --num-executors 10 --executor-cores 1 --master yarn myJar.jar

Также удалил set("spark.rdd.compress", "true") из кода, так как это увеличивает время процессора, как объяснялось здесь - https://spark.apache.org/docs/2.3.0/configuration.html#compression -и-сериализация

1 Ответ

1 голос
/ 02 августа 2020

Проблема в том, что проверка stat() на предмет последнего изменения происходит после линейного поиска в каталоге для поиска имени. Если вы можете изменить формат каталога, добавьте подкаталоги (рассчитанные по имени файла) и попытайтесь сгруппировать количество записей в каждом подкаталоге до ~ 1000.

В противном случае создайте карту name: lastModified и используйте WatchService для обновления карты при возникновении события.

...