Как программно добавить записи в таблицу кустов, используя al oop spark sql? - PullRequest
1 голос
/ 13 февраля 2020

Ниже я написал искровую логику c.

Высокий уровень: Код перебирает некоторые данные, извлекает некоторые записи партиями, применяет к ним некоторые логики c записывает и добавляет вывод в другую таблицу, созданную во время выполнения. Задание успешно завершено, но таблица пуста.

Подробно: Код должен создать кадр данных искры с 3 именами. Для каждого имени код создает запрос, используя имя в качестве условия фильтра, применяет некоторую логику c к возвращаемым данным и сохраняет их в новом кадре данных искры (output_spark_df). Затем этот фрейм данных преобразуется во временную таблицу и запускается. sql затем используется для вставки результатов в my_database.my_results. my_database.my_results должен иметь данные, загруженные в него 3 раза. Несмотря на успешное выполнение задания, my_database.my_results остается пустым.

Будем благодарны за любые указания.

if __name__ == "__main__":
    spark = SparkSession.builder.appName('batch_job').config("spark.kryoserializer.buffer.max", "2047mb").config("spark.sql.broadcastTimeout", "-1").config("spark.sql.autoBroadcastJoinThreshold","-1").getOrCreate()

    # Set up hive table to capture results
    #-------------------------------------
    spark.sql("DROP TABLE IF EXISTS my_database.my_results")
    spark.sql("CREATE TABLE IF NOT EXISTS my_database.my_results (field1 STRING, field2 INT) STORED AS PARQUET")

    names = spark.sql("select distinct name from my_database.my_input where name IN ('mike','jane','ryan')")

    for n in names:
        input_spark_df = spark.sql("select * from my_database.my_input where name = '{}'".format(n))
        .
        .
        .
        <APPLY LOGIC>
        .
        .
        .
        output_spark_df = <logic applied>

        # Capture output and append to pre-created hive table
        #----------------------------------------------------
        output_spark_df.registerTempTable("results")
        spark.sql("INSERT INTO TABLE my_database.my_results  SELECT * FROM results")

    spark.stop()

1 Ответ

0 голосов
/ 13 февраля 2020

names по-прежнему является фреймом данных в вашем коде, поскольку вы циклически перебираете фрейм данных, что приводит к отсутствию соответствующих записей внутри вашего для l oop.

Чтобы сделать names переменную как список, нам нужно сделать flatMap and collect, чтобы создать список, затем l oop над списком.

Fix:

# create names list
names=spark.sql("select distinct id as id from default.i").\
      rdd.\
      flatMap(lambda z:z).\
      collect()

# to print values in the list

for n in names:
    print(n)

Example with sample data:

#sample data
spark.sql("select distinct id as id from default.i").show()
#+---+
#| id|
#+---+
#|  1|
#|  2|
#|  3|
#+---+

#creating a list
names=spark.sql("select distinct id as id from default.i").flatMap(lambda z:z).collect()

#looping over the list
for n in names:
    spark.sql("select * from default.i where id = '{}'".format(n)).show()

#result 
#+---+
#| id|
#+---+
#|  1|
#+---+
#
#+---+
#| id|
#+---+
#|  2|
#+---+
#
#+---+
#| id|
#+---+
#|  3|
#+---+
...