Как сессионизировать действия, используя спарк? - PullRequest
1 голос
/ 28 января 2020

импорт pyspark. sql .функции как f из pyspark. sql окно импорта

Я использую клей AWS для сеансовых действий, который выглядит следующим образом:

#actions are grouped by SessionId
{"ItemViewed": 0, "DisheartedItem": 0, "PageChanged": 1, "Source": "bing", "SessionId": 85151275, "ActionCounter": 1, "LoadedItems": 0, "Action": "pagechange", "AddedFilter": 0, "ItemAddedToCart": 0, "InstanceId": "i-0fbf940e44eb8b479", "RemovedFilter": 0, "HeartedItem": 0, "Date": "2020-01-27 19:45:25"}

Мой скрипт прост, он просто преобразует динамический c фрейм в фрейм данных и получает определенные переменные, а затем преобразует обратно.

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "database_name", table_name = "raw_actions", transformation_ctx = "datasource0")

#convert to dataframe
df = datasource0.toDF()

#group by SessionId
w = Window.partitionBy('SessionId')


grouped = df.select(
    'SessionId', 
    f.first('InstanceId').over(w).alias('InstanceId'),
    f.first('Source').over(w).alias('Source'),
    f.count('SessionId').over(w).alias('Actions'),
    #etc...
).dropDuplicates()

#convert back to dynamicframe
dynamicframe = DynamicFrame.fromDF(grouped, glueContext, 'dynamicframe')

Но я продолжаю получать эту ошибку, я не уверен, почему

AnalysisException: "cannot resolve '`SessionId`' given input columns: [];;\n'Project [first('SessionId, false) windowspecdefinition('SessionId, unspecifiedframe$()) AS SessionId#1, first('InstanceId, false) windowspecdefinition('SessionId, unspecifiedframe$()) AS InstanceId#3, first('Source, false) windowspecdefinition('SessionId, unspecifiedframe$()) AS Source#5]\n+- LogicalRDD false\n"

Я также пытался использовать:

grouped = df.select(
    df['SessionId'], 
    f.first('InstanceId').over(w).alias('InstanceId'),
    f.first('Source').over(w).alias('Source'),
    f.count('SessionId').over(w).alias('Actions'),
    #etc...
).dropDuplicates()

, и я получаю эту ошибку:

Cannot resolve column name "SessionId" among ();'
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...