Задание Apache Beam завершается неудачно при выполнении оконного сеанса в большом наборе данных - PullRequest
2 голосов
/ 10 июня 2019

Я работаю над заданием Python Apache Beam с использованием оконных сессий для ограниченного набора данных. Это работает для небольших наборов данных, но работа умирает, когда я увеличиваю размер входных данных.

Идентификатор задания: 2019-06-10_07_28_32-2942508228086251217.

elements = (p | 'IngestData' >> beam.io.Read(big_query_source))

        elements | 'AddEventTimestamp' >> beam.ParDo(AddTimestampDoFn()) \
                        | 'SessionWindow' >> beam.WindowInto(window.Sessions(10 * 60)) \
                        | 'CreateTuple' >> beam.Map(lambda row: (row['id'], {'attribute1': row['attribute1'], 'date': row['date']})) \
                        | 'GroupById1' >> beam.GroupByKey() \
                        | 'AggregateSessions' >> beam.ParDo(AggregateTransactions()) \
                        | 'MergeWindows' >> beam.WindowInto(window.GlobalWindows()) \
                        | 'GroupById2' >> beam.GroupByKey() \
                        | 'MapSessionsToLists' >> beam.Map(lambda x: (x[0], [y for y in x[1]])) \
                        | 'BiggestSession' >> beam.ParDo(MaximumSession()) \
                        | "PrepForWrite" >> beam.Map(lambda x: x[1].update({"id": x[0]}) or x[1]) \
                        | 'WriteResult' >> WriteToText(known_args.output)

С классами DoFn, равными

class AddTimestampDoFn(beam.DoFn):
    def process(self, element):
        date = datetime.datetime.strptime(element['date'][:-4], '%Y-%m-%d %H:%M:%S.%f')
        unix_timestamp = float(date.strftime('%s'))
        yield beam.window.TimestampedValue(element, unix_timestamp)


class AggregateTransactions(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam):
        session_count = len(element[1])
        attributes = list(map(lambda row: row['attribute1'], element[1]))
        std = np.std(amounts)

        return [(element[0], {'session_count': session_count, 'session_std': std, 'window_start': window.start
                                                                                    .to_utc_datetime()
                                                                                    .strftime('%d-%b-%Y %H:%M:%S')})]


class MaximumSession(beam.DoFn):
    def process(self, element):
        sorted_counts = sorted(element[1], key = lambda x: x['session_count'], reverse=True)

        return [(element[0], {'session_count': sorted_counts[0]['session_count'], 
                                        'session_std': sorted_counts[0]['session_std'], 
                                        'window_start_time': sorted_counts[0]['window_start']})]

Задание не выполняется и выдает ошибку: The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers:

Конкретные рабочие журналы на стекдрайвере ни о чем не говорят. Я просто получаю комбинацию из этих записей:

processing lull for over 431.44 seconds in state process-msecs in step s5

Refusing to split <dataflow_worker.shuffle.GroupedShuffleRangeTracker object at 0x7f82e970cbd0> at '\n\xaaG\t\x00\x01': proposed split position is out of range

Retry with exponential backoff: waiting for 4.69305060273 seconds before retrying lease_work because we caught exception: SSLError: ('The read operation timed out',)

Остальные записи носят информационный характер.

Последнее использование памяти для этого конкретного работника было 43413 МБ. Поскольку я использую n1-highmem-32 машин, я не думаю, что память может быть проблемой здесь.

На стороне клиента, Cloud Shell, где я запускаю эту работу, я только что получил много

INFO:oauth2client.transport:Refreshing due to a 401 (attempt 1/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 2/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 1/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 1/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 2/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 2/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 1/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 2/2)

до сбоя задания.

Есть идеи?

Спасибо

1 Ответ

0 голосов
/ 14 июня 2019

По умолчанию поток данных повторяет конвейер 4 раза, если возникает какая-либо ошибка в режиме BATCH, и неопределенное время при работе в режиме STREAM.

Пожалуйста, создайте инструментальные панели в драйвере стека для машин вычислительных машин, используемых для конвейера, чтобы проанализировать объем памяти, использование ЦП и операции ввода-вывода. Поднятие конфига конвейера должно произойти после тщательного анализа вышеперечисленных факторов.

Убедитесь, что все преобразования работают нормально на основе предоставленных вами данных, а также примените обработку исключений.

...