Обновите старые записи новой информацией в pyspark без перезаписи - PullRequest
0 голосов
/ 30 мая 2019

Я хочу обновить старые записи на основе информации, которую я могу получить или не получить из нового экземпляра той же записи в pyspark.Вот как выглядит старая таблица / фрейм данных

| FirstName | LastName | JoinDate  | SnapshotBeginDate | SnapshotEndDate |           
-------------------------------------------------------------------------
| John      | Doe      |2017-04-05 | 2017-05-04        | 2099-12-31      |
-------------------------------------------------------------------------
| Jane      | Smith    |2018-04-05 | 2017-05-04        | 2099-12-31 |
-------------------------------------------------------------------------

Я не хочу просто добавлять новые данные в существующий фрейм данных.Также я не хочу перезаписывать существующую запись.Вместо этого я хотел бы обновить snapshotEndDate старой записи.

Например:

| FirstName | LastName | JoinDate  | SnapshotBeginDate | SnapshotEndDate |           
-------------------------------------------------------------------------
| John      | Doe      |2017-04-05 | 2017-05-04        | 2019-04-03      |
-------------------------------------------------------------------------
| Jane      | Smith    |2018-04-05 | 2017-05-04        | 2019-04-03|
-------------------------------------------------------------------------
| John      | Doe      |2017-04-05 | 2019-04-03        | 2099-12-31|
-------------------------------------------------------------------------
| Jane      | Smith    |2018-04-05 | 2019-04-03        | 2099-12-31|
-------------------------------------------------------------------------

1 Ответ

0 голосов
/ 01 июня 2019

Первое, что вам нужно сделать, это создать два фрейма данных из ваших данных (dfold и dfnew в следующем примере)

import datetime
import pyspark.sql.functions as F
l = [
 ('John',      'Doe'    ,  '2017-04-05',  '2017-05-04' ,        '2099-12-31'),
 ('Jane' ,      'Smith'  ,  '2018-04-05',  '2017-05-04' ,        '2099-12-31')
    ]

columns = [     'FirstName' , 'LastName', 'JoinDate'  , 'SnapshotBeginDate' , 'SnapshotEndDate']

dfold=spark.createDataFrame(l, columns)
dfold = dfold.withColumn('SnapshotBeginDate',   F.to_date(dfold.SnapshotBeginDate,  'yyyy-MM-dd'))
dfold = dfold.withColumn('SnapshotEndDate',   F.to_date(dfold.SnapshotEndDate,  'yyyy-MM-dd'))
dfnew = dfold

Вы можете обновить столбец SnapshotEndDate dfold и SnapshotBeginDate для dfnew с функцией withColumn .Эта функция позволяет применить операцию к столбцу.Вам также нужна текущая дата для обновления значений.Модуль pytime datetime предоставляет такую ​​функциональность (если вам не нужна текущая дата, просто укажите любую другую дату в виде строки), но он не возвращает столбец.Чтобы превратить возвращаемый объект в столбец, мы можем использовать функцию pyspark lit .

dfold= dfold.withColumn('SnapshotEndDate', F.lit(datetime.date.today()))
dfnew= dfnew.withColumn('SnapshotBeginDate', F.lit(datetime.date.today()))
dfold.union(dfnew).show()

Вывод:

+---------+--------+----------+-----------------+---------------+ 
|FirstName|LastName|  JoinDate|SnapshotBeginDate|SnapshotEndDate| 
+---------+--------+----------+-----------------+---------------+ 
|     John|     Doe|2017-04-05|       2017-05-04|     2019-06-01| 
|     Jane|   Smith|2018-04-05|       2017-05-04|     2019-06-01| 
|     John|     Doe|2017-04-05|       2019-06-01|     2099-12-31| 
|     Jane|   Smith|2018-04-05|       2019-06-01|     2099-12-31| 
+---------+--------+----------+-----------------+---------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...