Структурированная потоковая передача - объединение двух фреймов данных из одного источника потоковой передачи - PullRequest
0 голосов
/ 22 февраля 2019

У меня есть это приложение Spark, которое принимает поток Twitter.

Я добавил столбец времени:

timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
timestamp_df = tmp_df2.withColumn('time', unix_timestamp(lit(timestamp), 'yyyy-MM-dd HH:mm:ss').cast("timestamp"))

Соберите нужные мне поля, которые не требуют сплющивания:

main_df = (
    timestamp_df.selectExpr(['time', 'created_at', 'id',...])
)

Я сглаживаю другие части и преобразовываю список строк в строку:

entities_df = (
    timestamp_df
    .select(['time', 'id', explode('entities.user_mentions').alias('temp')])
    .selectExpr(['time', 'id AS tmp_id', 'temp.screen_name'])
    .withWatermark('time', '10 seconds')
    .groupBy(
        'tmp_id', window('time', '10 seconds', '5 seconds')
    )
    .agg(collect_set('screen_name').alias('tmp_screen_name'))
    .withColumn('entities_user_mentions_screen_name', concat_ws(', ', 'tmp_screen_name'))
)

И затем соединяю эти 2 кадра данных, чтобы получить нужные мне данные:

final_df = (
    main_df
    .join(entities_df, main_df.id == entities_df.tmp_id)
    .select(['created_at', 'id', ...])
)

Я получаю пустые фреймы данных при запуске.

Когда я запускаю коды на статических данных, используя следующие коды:

entities_df = (
    timestamp_df
    .select(['time', 'id', explode('entities.user_mentions').alias('temp')])
    .selectExpr(['time', 'id AS tmp_id', 'temp.screen_name'])
    .groupBy('tmp_id')
    .agg(collect_set('screen_name').alias('tmp_screen_name'))
    .withColumn('entities_user_mentions_screen_name', concat_ws(', ', 'tmp_screen_name'))
)

Если я запускаю вышеупомянутое (безВодяной знак) Я получаю эту ошибку:

Добавление режима вывода не поддерживается при потоковой агрегации на> потоковых DataFrames / DataSets без водяного знака

Может кто-нибудь сказать мне, что я 'я делаю неправильно?

1 Ответ

0 голосов
/ 26 февраля 2019

Хорошо, я решил проблему.Просто несколько изменений в коде:

timestamp_df = tmp_df2.withColumn('time', current_timestamp())

вместо:

timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
timestamp_df = tmp_df2.withColumn('time', unix_timestamp(lit(timestamp), 'yyyy-MM-dd HH:mm:ss').cast("timestamp"))

Для этого я не думаю, что это будет иметь значение, но я не пытался.

А для сущностей_df:

entities_df = (
    timestamp_df
    .select(['time', 'id', explode('entities.user_mentions').alias('temp')])
    .selectExpr(['time', 'id AS tmp_id', 'temp.screen_name'])
    .withWatermark('time', '5 seconds')
    .groupBy(
        'tmp_id',
        window('time', '5 seconds')
    )
    .agg(collect_set('screen_name').alias('tmp_screen_names'))
    .withColumn('entities_user_mentions_screen_names', concat_ws(', ', 'tmp_screen_names'))
)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...