спарк конвертировать несколько строк в одну строку с несколькими коллекциями - PullRequest
0 голосов
/ 21 января 2020

Я ищу идеи о том, как решить ниже сценарий. Мой пример использования - java Искра, но я ищу идеи о том, как сделать это независимо от языка, поскольку у меня закончились идеи

У меня есть неструктурированные данные, как показано ниже

98480|PERSON|TOM|GREER|1982|12|27
98480|PHONE|CELL|732|201|6789
98480|PHONE|HOME|732|123|9876
98480|ADDR|RES|102|JFK BLVD|PISCATAWAY|NJ|08854
98480|ADDR|OFF|211|EXCHANGE PL|JERSEY CITY|NJ|07302
98481|PERSON|LIN|JASSOY|1976|09|15
98481|PHONE|CELL|908|398|3389
98481|PHONE|HOME|917|363|2647
98481|ADDR|RES|111|JOURNAL SQ|JERSEY CITY|NJ|07704
98481|ADDR|OFF|365|DOWNTOWN NEWYORK|NEWYORK CITY|NY|10001

i я пытаюсь преобразовать их в строку с персоной данных с набором телефона и адресом, как показано ниже, в основном одна строка для каждого человекаId

+--------+------+---------+--------+----+-----+---+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+
|personId|type  |firstName|lastName|year|month|day|Phone                                                                | addr                                                                                                                 |                                                                                                                                                               |
+--------+------+---------+--------+----+-----+---+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+
|98481   |PERSON|LIN      |JASSOY  |1976|09   |15 |[[PHONE, HOME, 917, 363, 2647], [PHONE, CELL, 908, 398, 3389]]       | [[ADDR, OFF, 365, DOWNTOWN NEWYORK, NEWYORK CITY, NY, 10001], [ADDR, RES, 111, JOURNAL SQ, JERSEY CITY, NJ, 07704]]  |
|98480   |PERSON|TOM      |GREER   |1982|12   |27 |[[PHONE, HOME, 732, 123, 9876], [PHONE, CELL, 732, 201, 6789]]       | [[ADDR, RES, 102, JFK BLVD, PISCATAWAY, NJ, 08854], [ADDR, OFF, 211, EXCHANGE PL, JERSEY CITY, NJ, 07302]]           |
+--------+------+---------+--------+----+-----+---+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+

с кодом ниже

Dataset<Row> dataset = groupedDataset
                .agg(collect_set(struct(phoneRow.col("type").as("collType"), phoneRow.col("phoneType").as("phoneType"),
                        phoneRow.col("areaCode").as("areaCode"), phoneRow.col("phoneMiddle").as("phoneMiddle"),
                        phoneRow.col("ext").as("ext"), addressRow.col("type").as("collType"),
                        addressRow.col("addrType").as("addrType"), addressRow.col("addr1").as("rowType"),
                        addressRow.col("addr2").as("addr2"), addressRow.col("city").as("city"),
                        addressRow.col("state").as("state"), addressRow.col("zipCode").as("zipCode"))).as("addrPhone"));

вывод ниже, но не в формате, который я ищу

+--------+------+---------+--------+----+-----+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|personId|type  |firstName|lastName|year|month|day|addrPhone                                                                                                                                                                                                                                                                                                                                                 |
+--------+------+---------+--------+----+-----+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|98481   |PERSON|LIN      |JASSOY  |1976|09   |15 |[[PHONE, HOME, 917, 363, 2647, ADDR, OFF, 365, DOWNTOWN NEWYORK, NEWYORK CITY, NY, 10001], [PHONE, HOME, 917, 363, 2647, ADDR, RES, 111, JOURNAL SQ, JERSEY CITY, NJ, 07704], [PHONE, CELL, 908, 398, 3389, ADDR, RES, 111, JOURNAL SQ, JERSEY CITY, NJ, 07704], [PHONE, CELL, 908, 398, 3389, ADDR, OFF, 365, DOWNTOWN NEWYORK, NEWYORK CITY, NY, 10001]]|
|98480   |PERSON|TOM      |GREER   |1982|12   |27 |[[PHONE, HOME, 732, 123, 9876, ADDR, RES, 102, JFK BLVD, PISCATAWAY, NJ, 08854], [PHONE, CELL, 732, 201, 6789, ADDR, RES, 102, JFK BLVD, PISCATAWAY, NJ, 08854], [PHONE, CELL, 732, 201, 6789, ADDR, OFF, 211, EXCHANGE PL, JERSEY CITY, NJ, 07302], [PHONE, HOME, 732, 123, 9876, ADDR, OFF, 211, EXCHANGE PL, JERSEY CITY, NJ, 07302]]                  |
+--------+------+---------+--------+----+-----+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

ищу идеи для исправления вышеупомянутой проблемы

обновление: я смог получить результат, как и ожидалось, но я не уверен, насколько он эффективен, и похоже, что у него много кода с множеством объединений и фреймов данных. это пример данных, с которыми я играю, чтобы понять искру, но реальные данные, с которыми я бы работал, будут иметь множество сложных преобразований, и этот код не выглядит эффективным

здесь обновленный код

Dataset<Row> groupedPhoneDataSet = groupedDataset.agg(collect_set(struct(phoneRow.col("type").as("phColType"),
                phoneRow.col("phoneType").as("phoneType"), phoneRow.col("areaCode").as("areaCode"),
                phoneRow.col("phoneMiddle").as("phoneMiddle"), phoneRow.col("ext").as("ext"))).as("phoneRec"));

        Dataset<Row> groupedAddrDataSet = groupedDataset
                .agg(collect_set(struct(addressRow.col("type").as("addrColType"),
                        addressRow.col("addrType").as("addrType"), addressRow.col("addr1").as("addr1"),
                        addressRow.col("addr2").as("addr2"), addressRow.col("city").as("city"),
                        addressRow.col("state").as("state"), addressRow.col("zipCode").as("zipCode"))).as("addrRec"));

        Dataset<Row> finalDataSet = groupedAddrDataSet
                .join(groupedPhoneDataSet,
                        groupedAddrDataSet.col("personId").equalTo(groupedPhoneDataSet.col("personId")))
                .select(groupedPhoneDataSet.col("personId"), groupedPhoneDataSet.col("type"),
                        groupedPhoneDataSet.col("firstName"), groupedPhoneDataSet.col("lastName"),
                        groupedPhoneDataSet.col("year"), groupedPhoneDataSet.col("month"),
                        groupedPhoneDataSet.col("day"), col("phoneRec"), col("addrRec"));

вот вывод, который я получил

+--------+------+---------+--------+----+-----+---+--------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+
|personId|type  |firstName|lastName|year|month|day|phoneRec                                                      |addrRec                                                                                                            |
+--------+------+---------+--------+----+-----+---+--------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+
|98481   |PERSON|LIN      |JASSOY  |1976|09   |15 |[[PHONE, CELL, 908, 398, 3389], [PHONE, HOME, 917, 363, 2647]]|[[ADDR, RES, 111, JOURNAL SQ, JERSEY CITY, NJ, 07704], [ADDR, OFF, 365, DOWNTOWN NEWYORK, NEWYORK CITY, NY, 10001]]|
|98480   |PERSON|TOM      |GREER   |1982|12   |27 |[[PHONE, CELL, 732, 201, 6789], [PHONE, HOME, 732, 123, 9876]]|[[ADDR, OFF, 211, EXCHANGE PL, JERSEY CITY, NJ, 07302], [ADDR, RES, 102, JFK BLVD, PISCATAWAY, NJ, 08854]]         |
+--------+------+---------+--------+----+-----+---+--------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+

есть ли способ, которым я могу сделать это без создания большого количества фреймов данных

Ответы [ 2 ]

0 голосов
/ 21 января 2020

IIU C, вы можете читать ваши данные в линейном режиме, выполнять некоторые манипуляции с данными, а затем использовать collect_list или collect_set , чтобы получить желаемый результат:

from pyspark.sql.functions import expr, substring_index

# read the files into dataframe with a single column named `value`
df = spark.read.text('/path/to/file/')

Разделить строки на два столбца: personId (1-е поле) и столбец ArrayType data (остальные поля):

df1 = df.withColumn('personId', substring_index('value', '|', 1)) \
    .selectExpr('personId', 'split(substr(value, length(personId)+2), "[|]") as data')    
#+--------+--------------------+
#|personId|                data|
#+--------+--------------------+
#|   98480|[PERSON, TOM, GRE...|
#|   98480|[PHONE, CELL, 732...|
#|   98480|[PHONE, HOME, 732...|
#|   98480|[ADDR, RES, 102, ...|
#|   98480|[ADDR, OFF, 211, ...|
#|   98481|[PERSON, LIN, JAS...|
#|   98481|[PHONE, CELL, 908...|
#|   98481|[PHONE, HOME, 917...|
#|   98481|[ADDR, RES, 111, ...|
#|   98481|[ADDR, OFF, 365, ...|
#+--------+--------------------+

Использовать groupby + collect_list (или collect_set). обратите внимание, что collect_list / collect_set пропустит элементы со значениями NULL . Ниже мы используем collect_list для создания 3 столбцов ArrayType на основе значения data[0]:

(1) Если данные [0] == PHONE или ADDR, преобразовать data в StructType, результатом будет массив Structs.

(2) если data [0] == PERSON, оставить data как ArrayType, взять первый элемент (с именем d1) из полученного массива массивов и затем использовать selectExpr для преобразования этого массив d1 на 6 отдельных столбцов.

df1.groupby('personId') \
    .agg(
      expr("collect_list(IF(data[0] = 'PERSON', data, NULL))[0] as d1"),
      expr("""
        collect_list(
          IF(data[0] = 'PHONE'
          , (data[0] as phColType,
             data[1] as phoneType,
             data[2] as areaCode,
             data[3] as phoneMiddle,
             data[4] as ext)
          , NULL)
        ) AS Phone"""),
      expr("""
        collect_list(
          IF(data[0] = 'ADDR'
          , (data[0] as addrColType,
             data[1] as addrType,
             data[2] as addr1,
             data[3] as addr2,
             data[4] as city,
             data[5] as state,
             data[6] as zipCode)
          , NULL)
        ) AS Addr""")
    ).selectExpr(
      'personId',
      'd1[0] as type',
      'd1[1] as firstName',
      'd1[2] as lastName',
      'd1[3] as year',
      'd1[4] as month',
      'd1[5] as day',
      'Phone',
      'Addr'
    ).show(truncate=False)

Результат (и Phone, и Addr являются массивом структур):

+--------+------+---------+--------+----+-----+---+--------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+
|personId|type  |firstName|lastName|year|month|day|Phone                                                         |Addr                                                                                                               |
+--------+------+---------+--------+----+-----+---+--------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+
|98481   |PERSON|LIN      |JASSOY  |1976|09   |15 |[[PHONE, CELL, 908, 398, 3389], [PHONE, HOME, 917, 363, 2647]]|[[ADDR, RES, 111, JOURNAL SQ, JERSEY CITY, NJ, 07704], [ADDR, OFF, 365, DOWNTOWN NEWYORK, NEWYORK CITY, NY, 10001]]|
|98480   |PERSON|TOM      |GREER   |1982|12   |27 |[[PHONE, CELL, 732, 201, 6789], [PHONE, HOME, 732, 123, 9876]]|[[ADDR, RES, 102, JFK BLVD, PISCATAWAY, NJ, 08854], [ADDR, OFF, 211, EXCHANGE PL, JERSEY CITY, NJ, 07302]]         |
+--------+------+---------+--------+----+-----+---+--------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+
0 голосов
/ 21 января 2020

если у вас все в порядке с созданием нескольких фреймов данных, разбейте каждый тип записей на разные фреймы данных и выполните группировку по personId, объедините все три фрейма данных по идентификатору человека.

найдите приведенный ниже код, который Я пытался, дайте мне знать, если это решит вашу проблему.

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.functions.{col, collect_list, struct}

    object Test {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setAppName("Leads Processing Job").setMaster("local[1]")
        val sparkContext = new org.apache.spark.SparkContext(conf)
        val sqlContext = new org.apache.spark.sql.hive.HiveContext(sparkContext)
        val df = sqlContext.read.option("delimiter","|").format("csv").load("data.csv")
        df.printSchema()
        val df_person = df.where("_c1 = 'PERSON'")
          .select(col("_c0").as("personId"),col("_c1").as("type")
            ,col("_c2").as("firstName"),col("_c3").as("lastName")
            ,col("_c4").as("year"),col("_c5").as("month")
            ,col("_c6").as("day"))

        val df_address = df.where("_c1 = 'ADDR'")
        val df_phone = df.where("_c1 = 'PHONE'")
        val df_addr_f = df_address
          .withColumn("addr",struct(col("_c1"),col("_c2")
            ,col("_c3"),col("_c4"),col("_c5"),col("_c6")))
          .groupBy(col("_c0").as("personId")).agg(collect_list(col("addr")).as("addr"))

        val df_phone_f = df_phone.groupBy(col("_c0").as("personId")).agg(collect_list(struct(col("_c1"),col("_c2")
          ,col("_c3"),col("_c4"),col("_c5"))).as("Phone"))

        val final_df = df_person.join(df_addr_f,"personId").join(df_phone_f,"personId")

        final_df.show(false)
      }
    }

Он производит ниже вывода,

            +--------+------+---------+--------+----+-----+---+-----------------------------------------------------------------------------------------------------+--------------------------------------------------------------+
            |personId|type  |firstName|lastName|year|month|day|addr                                                                                                 |Phone                                                         |
            +--------+------+---------+--------+----+-----+---+-----------------------------------------------------------------------------------------------------+--------------------------------------------------------------+
            |98480   |PERSON|TOM      |GREER   |1982|12   |27 |[[ADDR, RES, 102, JFK BLVD, PISCATAWAY, NJ], [ADDR, OFF, 211, EXCHANGE PL, JERSEY CITY, NJ]]         |[[PHONE, CELL, 732, 201, 6789], [PHONE, HOME, 732, 123, 9876]]|
            |98481   |PERSON|LIN      |JASSOY  |1976|09   |15 |[[ADDR, RES, 111, JOURNAL SQ, JERSEY CITY, NJ], [ADDR, OFF, 365, DOWNTOWN NEWYORK, NEWYORK CITY, NY]]|[[PHONE, CELL, 908, 398, 3389], [PHONE, HOME, 917, 363, 2647]]|
            +--------+------+---------+--------+----+-----+---+-----------------------------------------------------------------------------------------------------+--------------------------------------------------------------+
...