Ниже я написал искровую логику c.
Высокий уровень: Код перебирает некоторые данные, извлекает некоторые записи партиями, применяет к ним некоторые логики c записывает и добавляет вывод в другую таблицу, созданную во время выполнения. Задание успешно завершено, но таблица пуста.
Подробно: Код должен создать кадр данных искры с 3 именами. Для каждого имени код создает запрос, используя имя в качестве условия фильтра, применяет некоторую логику c к возвращаемым данным и сохраняет их в новом кадре данных искры (output_spark_df). Затем этот фрейм данных преобразуется во временную таблицу и запускается. sql затем используется для вставки результатов в my_database.my_results. my_database.my_results должен иметь данные, загруженные в него 3 раза. Несмотря на успешное выполнение задания, my_database.my_results остается пустым.
Будем благодарны за любые указания.
if __name__ == "__main__":
spark = SparkSession.builder.appName('batch_job').config("spark.kryoserializer.buffer.max", "2047mb").config("spark.sql.broadcastTimeout", "-1").config("spark.sql.autoBroadcastJoinThreshold","-1").getOrCreate()
# Set up hive table to capture results
#-------------------------------------
spark.sql("DROP TABLE IF EXISTS my_database.my_results")
spark.sql("CREATE TABLE IF NOT EXISTS my_database.my_results (field1 STRING, field2 INT) STORED AS PARQUET")
names = spark.sql("select distinct name from my_database.my_input where name IN ('mike','jane','ryan')")
for n in names:
input_spark_df = spark.sql("select * from my_database.my_input where name = '{}'".format(n))
.
.
.
<APPLY LOGIC>
.
.
.
output_spark_df = <logic applied>
# Capture output and append to pre-created hive table
#----------------------------------------------------
output_spark_df.registerTempTable("results")
spark.sql("INSERT INTO TABLE my_database.my_results SELECT * FROM results")
spark.stop()