Я полагаю, что ваш второй подход имеет наибольшее значение.
Внутри вашей функции элемента процесса вы можете перехватывать любые исключения и регистрировать любые ошибки:
import org.sfl4j.Logger;
import org.slf4j.LoggerFactory;
import ...
public class MyDoFn<ObjectWithPubsubIdA, ObjectWithPubsubIdB> {
private static final Logger LOG = LoggerFactory.getLogger(MyDoFn.class);
@ProcessElement
public void processElement(ProcessContext c) {
ObjectWithPubsubIdA a = c.element();
try {
ObjectWithPubsubIdB b = // transform ObjectWithPubsubIdA ...
c.output(b);
} catch (Exception e) {
LOG.error("MyDoFn failed for message with id {} with exception {}", a.getId(), e);
}
}
}
Вы можете использовать абстрактный базовый классили какой-либо другой языковой конструкции для повторного использования кода, чтобы вы могли совместно использовать одну реализацию для всех ваших преобразований.