Мне нужно прочитать данные из базы данных Oracle, используя JDBC с Spark (2.2). Чтобы свести к минимуму передаваемые данные, я использую push-запрос, который уже фильтрует данные для загрузки. Эти данные затем добавляются в существующую таблицу Hive.
Чтобы регистрировать, что было загружено, я считаю записи, загруженные через JDBC. Мой код в основном выглядит так:
val query = "(select a, b from table where c = 1) t"
val myDF = spark.read.jdbc(jdbc_url, query, partitionColumn, lowerBound, upperBound, 10, connectionProperties).cache()
myDF.write.mode(SaveMode.Append)
.format("parquet")
.saveAsTable("my_table_name")
val numRecs = myDF.count()
Я предположил, что из-за cache () DataFrame считывается один раз через JDBC, сохраняется и используется для подсчета записей. Но когда я смотрю на сеансы, созданные в Oracle, я вижу, что сама операция подсчета создает 10 сеансов в базе данных Oracle. Сначала я вижу 10 сеансов, которые в основном выглядят так:
SELECT * FROM (select a, b from table where c = 1) t WHERE a >= x AND < a y
И после того, как это будет сделано, появятся еще 10 сеансов:
SELECT 1 FROM (select a, b from table where c = 1) t WHERE a >= x AND < a y
Похоже, что Spark загружает данные из источника JDBC только для подсчета записей, когда этого уже должно быть достаточно для использования уже загруженных данных. Как это можно предотвратить, и Spark принудительно считывает данные только один раз из источника JDBC?
UPDATE
Оказывается, я был слеп: в моем коде был еще один count () до вызова saveAsTable. Так что в этом есть смысл, первое действие, вызванное для DataFrame, было действительно count (). После устранения этого он вел себя как ожидалось.