Пустые значения при попытке получить Apache beam DataFlow SideInput - PullRequest
0 голосов
/ 10 февраля 2019

У меня есть простой конвейер, который преобразует события из Кафки в Большой запрос.Я хочу сохранить все ошибочные строки в большом запросе в другой таблице, и я хочу добавить событие источника, чтобы получить дополнительную информацию об ошибке.НО, потому что API BigQuery отправляет мне только неудачную строку, мне нужно использовать некоторые технологии для достижения контекста.Итак, я использую SideInputView, содержит карту между идентификатором строки и ее событием, а затем, когда строка не может быть сохранена, я беру ее событие из SideInputView.

В результате всего этого дизайна.эта работа 10% от событий.большинство событий получают пустое значение с боковой карты вида ввода.Но у меня нет

моего кода:

// create view by row id has
 PCollectionView<Map<String, String>> tableRowToInsertView =
                tableRowToInsertCollection
                        .apply(Window.<TableRowWithEvent>into(FixedWindows.of(Duration.standardMinutes(1)))
                                .withAllowedLateness(Duration.standardSeconds(10))
                                .accumulatingFiredPanes()
                                .triggering(AfterProcessingTime.pastFirstElementInPane()))
                        .apply(MapElements.via(
                                new SimpleFunction<TableRowWithEvent, KV<String, String>>() {
                                    @Override
                                    public KV<String, String> apply(TableRowWithEvent input) {
                                        return KV.of(input.getTableRow().get("uuid").toString(), input.getEvent());
                                    }
                                }))
                        .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
                        .apply("CreateView", View.asMap());



//using later the side input view
  writeResult
                .getFailedInsertsWithErr()
                .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
                .apply("BQ-insert-error-extract", ParDo.of(new BigQueryInsertErrorExtractFn(tableRowToInsertView)).withSideInputs(tableRowToInsertView))
                .apply("BQ-insert-error-write", BigQueryIO.writeTableRows()
                        .to(errTableSpec)
                        .withJsonSchema(errSchema)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

и самой функции преобразования, которая использует SideInputView

public class BigQueryInsertErrorExtractFn extends DoFn<BigQueryInsertError, TableRow> {


private final PCollectionView<Map<String, String>> tableRowToInsertView;


public BigQueryInsertErrorExtractFn(PCollectionView<Map<String, String>> tableRowToInsertView) {
    this.tableRowToInsertView = tableRowToInsertView;
}

@ProcessElement
public void processElement(ProcessContext context) {

    BigQueryInsertError bigQueryInsertError=context.element();
    TableRow row = bigQueryInsertError.getRow();

    TableRow convertedRow = new TableRow();
    convertedRow.set("table_row", row.toString());
    convertedRow.set("error", bigQueryInsertError.getError().toString());
    convertedRow.set("error_type", ERROR_TYPE.BQ_INSERT);
    convertedRow.set("t", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS").format(new Date()));

    Map<String, String> sideInput = context.sideInput(tableRowToInsertView);

    String event = sideInput.get(bigQueryInsertError.getRow().get("uuid").toString());
    if (event != null) {
        convertedRow.set("event", event);
    }

    context.output(convertedRow);

}

}

...