У меня есть это приложение Spark, которое принимает поток Twitter.
Я добавил столбец времени:
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
timestamp_df = tmp_df2.withColumn('time', unix_timestamp(lit(timestamp), 'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
Соберите нужные мне поля, которые не требуют сплющивания:
main_df = (
timestamp_df.selectExpr(['time', 'created_at', 'id',...])
)
Я сглаживаю другие части и преобразовываю список строк в строку:
entities_df = (
timestamp_df
.select(['time', 'id', explode('entities.user_mentions').alias('temp')])
.selectExpr(['time', 'id AS tmp_id', 'temp.screen_name'])
.withWatermark('time', '10 seconds')
.groupBy(
'tmp_id', window('time', '10 seconds', '5 seconds')
)
.agg(collect_set('screen_name').alias('tmp_screen_name'))
.withColumn('entities_user_mentions_screen_name', concat_ws(', ', 'tmp_screen_name'))
)
И затем соединяю эти 2 кадра данных, чтобы получить нужные мне данные:
final_df = (
main_df
.join(entities_df, main_df.id == entities_df.tmp_id)
.select(['created_at', 'id', ...])
)
Я получаю пустые фреймы данных при запуске.
Когда я запускаю коды на статических данных, используя следующие коды:
entities_df = (
timestamp_df
.select(['time', 'id', explode('entities.user_mentions').alias('temp')])
.selectExpr(['time', 'id AS tmp_id', 'temp.screen_name'])
.groupBy('tmp_id')
.agg(collect_set('screen_name').alias('tmp_screen_name'))
.withColumn('entities_user_mentions_screen_name', concat_ws(', ', 'tmp_screen_name'))
)
Если я запускаю вышеупомянутое (безВодяной знак) Я получаю эту ошибку:
Добавление режима вывода не поддерживается при потоковой агрегации на> потоковых DataFrames / DataSets без водяного знака
Может кто-нибудь сказать мне, что я 'я делаю неправильно?