Я не эксперт ни по Spark SQL API, ни по основному RDD.
Но, зная механизм оптимизации Catalyst, я бы ожидал, что Spark попытается минимизировать усилия в памяти.
Это моя ситуация: у меня есть, скажем, две таблицы
TABLE GenericOperation (ID, CommonFields...)
TABLE SpecificOperation (OperationID, SpecificFields...)
Они оба довольно большие (~ 500M, не большие данные, но невозможно иметь какцелое в памяти на стандартном сервере приложений)
Тем не менее, предположим, что мне нужно извлечь с помощью Spark (часть более крупного варианта использования) все экземпляры SpecificOperation
, которые соответствуют определенному условию для полей, принадлежащихGenericOperation
.
Это код, который я использую:
val gOps = spark.read.jdbc(db.connection, "GenericOperation", db.properties)
val sOps = spark.read.jdbc(db.connection, "SpecificOperation", db.properties)
val joined = sOps.join(gOps).where("ID = OperationID")
joined.where("CommonField= 'SomeValue'").select("SpecificField").show()
Проблема в том, что когда дело доходит до выполнения вышеизложенного, я вижу из SQL Profiler, что Spark не выполняет объединение набазы данных, а скорее извлекает все OperationID
из SpecificOperation
, и тогда я предполагаю, что будет выполнено все слияние в памяти.Поскольку никакой фильтр не применим к SpecificOperation
, такое извлечение принесло бы много, слишком много данных в конечную систему.
Можно ли написать выше, чтобы объединение былотребовал напрямую к дбмс?Или это зависит от какой-то волшебной конфигурации Spark, о которой я не знаю?
Конечно, я мог бы просто жестко закодировать объединение как подзапрос при извлечении, но в моем случае это невозможно, поскольку операторы должны быть созданы ввремя выполнения, начиная с простых строительных блоков.Следовательно, мне нужно реализовать это, начиная с двух spark.sql.DataFrame
уже собранных
. В качестве примечания, я запускаю это с Spark 2.3.0 для Scala 2.11 против экземпляра базы данных SQL Server 2016.