Попытка реализовать следующие сценарии,
- Соединение двух таблиц (A, B) с одинаковыми ключами
- Таблица фильтра (c)
- Присоединение к результатуШаг 1 и результат шага 2. Здесь он имеет другое имя ключа, но одинаковые значения (например: имя столбца 1-й таблицы равно «id», имя столбца 2-й таблицы - «Fid», но оба значения одинаковы).
при выполнении кода с использованием облачного потока данных появляется ошибка ниже,
SEVERE: 2018-12-03T13: 52: 47.634Z: java.lang.IllegalStateException: ожидаются уникальные ключи, но найден ключ 127348 # ноль со значениями{HEADER_ID = 18219955, ORDER_TYPE_ID = 2124, ORDER_NUMBER = 729637, ORDERED_DATE = 10/29/2018 4:01:25 PM, TRANSACTIONAL_CURR_CODE = USD, CUST_PO_NUMBER = 942634, SOLD_TO_ORG_ID = 127348, SHIP_FROM_ORG_ID = 934, PRICE_LIST_ID = 7035, = 2018 CREATION_DATE-10-29 16:10:41 UTC, LAST_UPDATE_DATE = 2018-10-29 16:10:13 UTC, FLOW_STATUS_CODE = BOOKED} и {HEADER_ID = 18219945, ORDER_TYPE_ID = 2124, ORDER_NUMBER = 729636, ORDERED_DATE = 10/293:56:05 вечера, TRANSACTIONAL_CURR_CODE = USD,CUST_PO_NUMBER = 941674, SOLD_TO_ORG_ID = 127348, SHIP_FROM_ORG_ID = 934, PRICE_LIST_ID = 7035, CREATION_DATE = 2018-10-29 15:10:20 UTC, LAST_UPDATE_DATE = 2018-10-29 16:10:34 UTC, FLOW_ST в UTC, FLOW_STorg.apache.beam.sdk.transforms.windowing.GlobalWindow@6c5cc8ee.at org.apache.beam.runners.dataflow.BatchViewOverrides $ BatchViewAsMultimap $ ToIsmRecordForMapLikeDoFn.processElement (BatchViewOverrides.java:442)
Вот весь код, который я пробовал:
WithKeys<String, TableRow> headerKey = WithKeys.of( (TableRow row) -> String.format("%s",row.get("PARTY_ID"))).withKeyType(TypeDescriptors.strings());
PCollection<KV<String,TableRow>> mainInput = p.apply("ReadCustomerAccount",BigQueryIO.readTableRows().from(options.getCustAccount())).apply("WithKeys", headerKey);
PCollection<KV<String,TableRow>> sideInput = p.apply("ReadCustomerParty",BigQueryIO.readTableRows().from(options.getPartyTable())).apply("WithKeys", headerKey);
PCollection<TableRow> result = CommonFunctions.innerJoinBQTbls("InnerJoin",mainInput,sideInput);
@SuppressWarnings("serial")
PCollection<TableRow> finalResultCollection = result.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>()
{
@ProcessElement
public void processElement(ProcessContext c)
{
TableRow keyString = c.element();
TableRow mainList = (TableRow) keyString.get("main");
TableRow sideList = (TableRow) keyString.get("side");
TableRow targetRow = new TableRow();
targetRow.set("partyID", Integer.valueOf(keyString.get("key").toString()));
targetRow.set("accountNumber", mainList.get("ACCOUNT_NUMBER"));
targetRow.set("customerName", sideList.get("PARTY_NAME"));
targetRow.set("updatedDate",keyString.get("updatedDate"));
c.output(targetRow);
}
}));
PCollection<TableRow> headerData = p.apply("ReadInvoice",BigQueryIO.readTableRows().from(options.getOrderHeaderAll()));
PCollection<TableRow> pc934Collection = headerData.apply(Filter.by(
(TableRow t) -> {
String orgCode = t.get("SHIP_FROM_ORG_ID").toString();
if (orgCode.equals("934")) {
return true;
}
return false;
}
));
WithKeys<String, TableRow> soltoOrg = WithKeys.of(
(TableRow row) ->
String.format("%s#%s",
row.get("SOLD_TO_ORG_ID"),
row.get("CUST_ACCOUNT_ID")))
.withKeyType(TypeDescriptors.strings());
PCollection<KV<String,TableRow>> customerHeaderAccount = pc934Collection.apply("WithKeys", soltoOrg);
PCollection<KV<String,TableRow>> customerHeaderAll = finalResultCollection.apply("WithKeys", soltoOrg);
PCollection<TableRow> secondResult = CommonFunctions.innerJoinBQTbls("InnerJoin1",customerHeaderAll,customerHeaderAccount);
@SuppressWarnings("serial")
PCollection<TableRow> secondResultCollection = secondResult.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>()
{
@ProcessElement
public void processElement(ProcessContext c)
{
TableRow keyString = c.element();
TableRow mainList = (TableRow) keyString.get("main");
TableRow sideList = (TableRow) keyString.get("side");
TableRow targetRow = new TableRow();
targetRow.set("orderNumber", mainList.get("ORDER_NUMBER"));
targetRow.set("headerId", Integer.valueOf(mainList.get("HEADER_ID").toString()));
targetRow.set("partyID", Integer.valueOf(keyString.get("key").toString()));
targetRow.set("accountNumber", mainList.get("ACCOUNT_NUMBER"));
targetRow.set("customerName", sideList.get("PARTY_NAME"));
targetRow.set("updatedDate",keyString.get("updatedDate"));
c.output(targetRow);
}
}));