У меня есть датафрейм, который выглядит так:
data.show()
+--------------------++-------------
| feature| id |
+--------------------++-------------
|[2.23668528E8, 1....| image1 |
|[2.23668528E8, 1....| image2 |
|[2.23668528E8, 1....| image3 |
|[2.23668528E8, 1....| image4 |
|[2.23668528E8, 1....| image5 |
Я пытаюсь найти сходство во всех парах. Эти функции в основном представляют собой простые векторы, извлеченные из модели vgg16. Количество строк составляет около 1 миллиона, и все функции хранятся в s3. Я использую pyspark, чтобы добиться этого. Я пробовал следующие шаги:
- Получить все пути к файлам из s3 в списке.
- Распределите все пути, используя sc.parallelize (список путей к файлам, 300), а затем вызовите функцию, которая загрузит функцию из s3 в каждом из исполнителей для каждого из путей.
- Преобразовать rdd в фрейм данных
- Наконец, вызовите LSH для этого кадра данных.
Проблема в том, что в этом случае задание не выполняется. Между тем, LSH должен был вычислить сходство для миллионов файлов за очень короткое время, как указано в следующих ссылках: https://databricks.com/blog/2017/05/09/detecting-abuse-scale-locality-sensitive-hashing-uber-engineering.html.
Я использовал параметры spark-submit:
spark-submit --verbose --deploy-mode cluster --num-executors 5 --executor-cores 40 --executor-memory 110G --driver-cores 20 --driver-memory 42G --conf spark.yarn.appMasterEnv.ENVIRONMENT=dev --conf spark.executorEnv.ENVIRONMENT=dev --conf spark.yarn.appMasterEnv.REGION=us-east-1 --conf spark.executorEnv.REGION=us-east-1 --conf spark.dynamicAllocation.enabled=false --conf spark.shuffle.service.enabled=false --conf spark.dynamicAllocation.executorIdleTimeout=1200s --conf spark.network.timeout=42000s s3://<mainfile.py>
Я также попытался скопировать все функции в HDFS и использовать тот же подход, но ничего не работает. Любая помощь будет принята с благодарностью?