У меня есть простой конвейер, который преобразует события из Кафки в Большой запрос.Я хочу сохранить все ошибочные строки в большом запросе в другой таблице, и я хочу добавить событие источника, чтобы получить дополнительную информацию об ошибке.НО, потому что API BigQuery отправляет мне только неудачную строку, мне нужно использовать некоторые технологии для достижения контекста.Итак, я использую SideInputView, содержит карту между идентификатором строки и ее событием, а затем, когда строка не может быть сохранена, я беру ее событие из SideInputView.
В результате всего этого дизайна.эта работа 10% от событий.большинство событий получают пустое значение с боковой карты вида ввода.Но у меня нет
моего кода:
// create view by row id has
PCollectionView<Map<String, String>> tableRowToInsertView =
tableRowToInsertCollection
.apply(Window.<TableRowWithEvent>into(FixedWindows.of(Duration.standardMinutes(1)))
.withAllowedLateness(Duration.standardSeconds(10))
.accumulatingFiredPanes()
.triggering(AfterProcessingTime.pastFirstElementInPane()))
.apply(MapElements.via(
new SimpleFunction<TableRowWithEvent, KV<String, String>>() {
@Override
public KV<String, String> apply(TableRowWithEvent input) {
return KV.of(input.getTableRow().get("uuid").toString(), input.getEvent());
}
}))
.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.apply("CreateView", View.asMap());
//using later the side input view
writeResult
.getFailedInsertsWithErr()
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply("BQ-insert-error-extract", ParDo.of(new BigQueryInsertErrorExtractFn(tableRowToInsertView)).withSideInputs(tableRowToInsertView))
.apply("BQ-insert-error-write", BigQueryIO.writeTableRows()
.to(errTableSpec)
.withJsonSchema(errSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
и самой функции преобразования, которая использует SideInputView
public class BigQueryInsertErrorExtractFn extends DoFn<BigQueryInsertError, TableRow> {
private final PCollectionView<Map<String, String>> tableRowToInsertView;
public BigQueryInsertErrorExtractFn(PCollectionView<Map<String, String>> tableRowToInsertView) {
this.tableRowToInsertView = tableRowToInsertView;
}
@ProcessElement
public void processElement(ProcessContext context) {
BigQueryInsertError bigQueryInsertError=context.element();
TableRow row = bigQueryInsertError.getRow();
TableRow convertedRow = new TableRow();
convertedRow.set("table_row", row.toString());
convertedRow.set("error", bigQueryInsertError.getError().toString());
convertedRow.set("error_type", ERROR_TYPE.BQ_INSERT);
convertedRow.set("t", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS").format(new Date()));
Map<String, String> sideInput = context.sideInput(tableRowToInsertView);
String event = sideInput.get(bigQueryInsertError.getRow().get("uuid").toString());
if (event != null) {
convertedRow.set("event", event);
}
context.output(convertedRow);
}
}