Отслеживание журнала в потоковом конвейере потока данных - PullRequest
0 голосов
/ 22 мая 2019

У меня есть настройка потока данных с несколькими конвейерами, извлекающая данные из пабов. Поскольку эти конвейеры разветвляются и объединяются с трансформаторами и цепями DoFunctions, необходимо отслеживать каждое сообщение pubsub, поступающее по всему конвейеру.

Каков был бы правильный способ сделать это? Некоторые мысли:

  1. Боковой ввод
  2. Каждый вход в функцию ParDo для создания объекта контекста с идентификаторами трассировки и т. Д. (Немного не интуитивно понятно)

Спасибо!

1 Ответ

0 голосов
/ 23 мая 2019

Я полагаю, что ваш второй подход имеет наибольшее значение.

Внутри вашей функции элемента процесса вы можете перехватывать любые исключения и регистрировать любые ошибки:

import org.sfl4j.Logger;
import org.slf4j.LoggerFactory;
import ...

public class MyDoFn<ObjectWithPubsubIdA, ObjectWithPubsubIdB> {
  private static final Logger LOG = LoggerFactory.getLogger(MyDoFn.class);

  @ProcessElement
  public void processElement(ProcessContext c) {
    ObjectWithPubsubIdA a = c.element();
    try {
      ObjectWithPubsubIdB b = // transform ObjectWithPubsubIdA ...
      c.output(b);
    } catch (Exception e) {
      LOG.error("MyDoFn failed for message with id {} with exception {}", a.getId(), e);
    }
  }
}

Вы можете использовать абстрактный базовый классили какой-либо другой языковой конструкции для повторного использования кода, чтобы вы могли совместно использовать одну реализацию для всех ваших преобразований.

...