Apache Луч Python SDK - Неточный интервал окна сеанса - PullRequest
0 голосов
/ 24 апреля 2020

Я пытаюсь обработать данные с интервалом в 60 минут, используя Apache Beam Python SDK. Но фактический интервал сеанса был неточным, например 3:00:00 или 1:01:00 или 1:50:00, когда я запускаю свое приложение.

Не могли бы вы помочь мне найти решение, чтобы исправить эту проблему и обработать данные с 60-минутным сеансом ?

Я построил свой трубопровод как показано ниже.

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | "Read" >> ReadFromText(known_args.input, skip_header_lines=1)
            | "Convert" >> ParDo(Convert())
            | "Add Timestamp" >> Map(lambda x: window.TimestampedValue(x, get_timestamp_from_element(x).timestamp()))
            | "Use User ID As Key" >> Map(lambda x: (x["user_id"], x))
            | "Apply Session Window" >> WindowInto(window.Sessions(known_args.session_interval))
            | "Group" >> GroupByKey()
            | "Write To CSV" >> ParDo(WriteToCSV(known_args.output))
        )
        result = pipeline.run()
        result.wait_until_finish()

session_interval (60 минут) указано ниже.

    parser.add_argument(
        "--session_interval",
        help="Interval of each session",
        default=60*60) # 60 mins

WriteToCSV функция обработки данных за сеанс. Я записал продолжительность сеанса, но она не была точной.

class WriteToCSV(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, element, window=DoFn.WindowParam):
        window_start = window.start.to_utc_datetime()
        window_end = window.end.to_utc_datetime()
        duration = window_end - window_start
        logging.info(">>> new %s record(s) in %s session (start %s end %s)", len(click_records), duration, window_start, window_end)
        ....

Затем я получил эти сообщения журнала, когда я запускаю это приложение локально с DirectRunner.

new 5 records in 3:00:00 session (start 2018-10-19 02:00:00 end 2018-10-19 05:00:00)
new 2 records in 1:01:00 session (start 2018-10-19 02:02:00 end 2018-10-19 03:03:00)
new 2 records in 1:50:00 session (start 2018-10-19 03:10:00 end 2018-10-19 05:00:00)

Я также развернул конвейер в Затем поток данных получил тот же результат.

new 2 record(s) in 1:50:00 session (start 2018-10-19 11:10:00 end 2018-10-19 13:00:00)
new 2 record(s) in 1:01:00 session (start 2018-10-19 10:02:00 end 2018-10-19 11:03:00)
new 5 record(s) in 3:00:00 session (start 2018-10-19 10:00:00 end 2018-10-19 13:00:00)

1 Ответ

1 голос
/ 24 апреля 2020

В вашем конвейере лучей переменная `` known_args.session_interval in window.Sessions` определяет длительность промежутка, то есть продолжительность, в течение которой, если для указанного ключа c дальнейшие события не приходят, окно закрывается. Каждый сеанс может иметь различную длительность начала и конца в зависимости от количества событий, которые обрабатываются конвейером для данного ключа. Это объясняется в графическом виде здесь

Например,

Key 1 - 10:00 AM ----|
Key 1 - 10:45 AM     |
Key 1 - 11:30 AM     |====> One Session Window for Key 1 of Duration 4hours 30 minutes
Key 1 - 12:15 PM     |
Key 1 - 01:00 PM ----|
Key 1 - 02:30 PM =========> Start of new session window for Key 1

Key 2 - 10:00 AM-----|
Key 2 - 10:30 AM     |====> One Session window for key 2 of Duration 1:00 hour
Key 2 - 11:00 PM-----|
Key 2 - 12:30 PM =========> Start of new session window for Key 2

Если вы заинтересованы в группировке и обработке событий каждые 60 минут, вам необходимо использовать Фиксированный Windows.

...