Как обновить фрейм данных pyspark, используя foreach - PullRequest
0 голосов
/ 08 ноября 2019

У меня есть фрейм данных pyspark, и я хотел бы обрабатывать каждую строку и обновлять / удалять / вставлять строки на основе некоторой логики. Я пытался использовать "foreach" и "foreachPartition", но я не могу понять, как он будет возвращать измененные данные для обновления фактического фрейма данных

data = [

            {
                "city": "s",
                "latitude": "51",
                "longitude": "5",
                "region": "Europe",
                "date_range": "date_last_year",

            },
            {
                "city": "s",
                "latitude": "5",
                "longitude": "5.67",
                "region": "Europe",
                "date_range": "date_all_time",

            },
            {
                "city": "Aalborg",
                "latitude": "57.03",
                "longitude": "9.007",
                "region": "Europe",
                "date_range": "date_last_year",

            },
            {
                "city": "Aalborg",
                "latitude": "57.033",
                "longitude": "9.0007",
                "region": "Europe",
                "date_range": "date_last_year",

            },
            {
                "city": "Aalborg",
                "latitude": "57.0",
                "longitude": "9.97",
                "region": "Europe",
                "date_range": "date_last_year",

            },
            {
                "city": "Aarau",
                "latitude": "47.32",
                "longitude": "8.05",
                "region": "Europe",
                "date_range": "date_last_year",

            },    
]

from pyspark import SparkContext
from pyspark.sql import SQLContext, functions as sf

sc = SparkContext()
sqlContext = SQLContext(sc)

df = sc.parallelize(data).toDF()

def myfunction(row):
    if float(row.latitude) > 50:
        print('do_something')
        # need to access "df" to do some operations

df.foreach(myfunction)
df.show()

# output
do_something
do_something
do_something
do_something
+-------+--------------+--------+---------+------+                              
|   city|    date_range|latitude|longitude|region|
+-------+--------------+--------+---------+------+
|      s|date_last_year|      51|        5|Europe|
|      s| date_all_time|       5|     5.67|Europe|
|Aalborg|date_last_year|   57.03|    9.007|Europe|
|Aalborg|date_last_year|  57.033|   9.0007|Europe|
|Aalborg|date_last_year|    57.0|     9.97|Europe|
|  Aarau|date_last_year|   47.32|     8.05|Europe|
+-------+--------------+--------+---------+------+

Я хочу либо передать "df""в функцию foreach или вернуть и объединить их при вызове функции foreach. Как это сделать?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...