Соедините две таблицы с разными именами ключей - PullRequest
0 голосов
/ 03 декабря 2018

Попытка реализовать следующие сценарии,

  1. Соединение двух таблиц (A, B) с одинаковыми ключами
  2. Таблица фильтра (c)
  3. Присоединение к результатуШаг 1 и результат шага 2. Здесь он имеет другое имя ключа, но одинаковые значения (например: имя столбца 1-й таблицы равно «id», имя столбца 2-й таблицы - «Fid», но оба значения одинаковы).

при выполнении кода с использованием облачного потока данных появляется ошибка ниже,

SEVERE: 2018-12-03T13: 52: 47.634Z: java.lang.IllegalStateException: ожидаются уникальные ключи, но найден ключ 127348 # ноль со значениями{HEADER_ID = 18219955, ORDER_TYPE_ID = 2124, ORDER_NUMBER = 729637, ORDERED_DATE = 10/29/2018 4:01:25 PM, TRANSACTIONAL_CURR_CODE = USD, CUST_PO_NUMBER = 942634, SOLD_TO_ORG_ID = 127348, SHIP_FROM_ORG_ID = 934, PRICE_LIST_ID = 7035, = 2018 CREATION_DATE-10-29 16:10:41 UTC, LAST_UPDATE_DATE = 2018-10-29 16:10:13 UTC, FLOW_STATUS_CODE = BOOKED} и {HEADER_ID = 18219945, ORDER_TYPE_ID = 2124, ORDER_NUMBER = 729636, ORDERED_DATE = 10/293:56:05 вечера, TRANSACTIONAL_CURR_CODE = USD,CUST_PO_NUMBER = 941674, SOLD_TO_ORG_ID = 127348, SHIP_FROM_ORG_ID = 934, PRICE_LIST_ID = 7035, CREATION_DATE = 2018-10-29 15:10:20 UTC, LAST_UPDATE_DATE = 2018-10-29 16:10:34 UTC, FLOW_ST в UTC, FLOW_STorg.apache.beam.sdk.transforms.windowing.GlobalWindow@6c5cc8ee.at org.apache.beam.runners.dataflow.BatchViewOverrides $ BatchViewAsMultimap $ ToIsmRecordForMapLikeDoFn.processElement (BatchViewOverrides.java:442)

Вот весь код, который я пробовал:

WithKeys<String, TableRow> headerKey = WithKeys.of( (TableRow row) -> String.format("%s",row.get("PARTY_ID"))).withKeyType(TypeDescriptors.strings());

            PCollection<KV<String,TableRow>> mainInput = p.apply("ReadCustomerAccount",BigQueryIO.readTableRows().from(options.getCustAccount())).apply("WithKeys", headerKey);
            PCollection<KV<String,TableRow>> sideInput = p.apply("ReadCustomerParty",BigQueryIO.readTableRows().from(options.getPartyTable())).apply("WithKeys", headerKey);


            PCollection<TableRow> result  = CommonFunctions.innerJoinBQTbls("InnerJoin",mainInput,sideInput);

            @SuppressWarnings("serial")
            PCollection<TableRow> finalResultCollection =  result.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>() 
            {
                  @ProcessElement
                  public void processElement(ProcessContext c) 
                  {
                      TableRow keyString = c.element();

                      TableRow mainList = (TableRow) keyString.get("main");
                      TableRow sideList = (TableRow) keyString.get("side");

                      TableRow targetRow = new TableRow();

                      targetRow.set("partyID", Integer.valueOf(keyString.get("key").toString()));
                      targetRow.set("accountNumber", mainList.get("ACCOUNT_NUMBER"));
                      targetRow.set("customerName", sideList.get("PARTY_NAME"));
                      targetRow.set("updatedDate",keyString.get("updatedDate"));

                      c.output(targetRow);
                  }
            }));

            PCollection<TableRow> headerData = p.apply("ReadInvoice",BigQueryIO.readTableRows().from(options.getOrderHeaderAll()));

            PCollection<TableRow> pc934Collection = headerData.apply(Filter.by(
                     (TableRow t) -> {
                         String orgCode = t.get("SHIP_FROM_ORG_ID").toString();
                         if (orgCode.equals("934")) {
                             return true;
                         }
                         return false;
                     }
                    ));

            WithKeys<String, TableRow> soltoOrg = WithKeys.of(
                    (TableRow row) ->
                        String.format("%s#%s",
                            row.get("SOLD_TO_ORG_ID"),
                            row.get("CUST_ACCOUNT_ID")))
                    .withKeyType(TypeDescriptors.strings());

            PCollection<KV<String,TableRow>> customerHeaderAccount = pc934Collection.apply("WithKeys", soltoOrg);
            PCollection<KV<String,TableRow>> customerHeaderAll = finalResultCollection.apply("WithKeys", soltoOrg);

            PCollection<TableRow> secondResult  = CommonFunctions.innerJoinBQTbls("InnerJoin1",customerHeaderAll,customerHeaderAccount);

            @SuppressWarnings("serial")
            PCollection<TableRow> secondResultCollection =  secondResult.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>() 
            {
                  @ProcessElement
                  public void processElement(ProcessContext c) 
                  {
                      TableRow keyString = c.element();

                      TableRow mainList = (TableRow) keyString.get("main");
                      TableRow sideList = (TableRow) keyString.get("side");

                      TableRow targetRow = new TableRow();

                      targetRow.set("orderNumber", mainList.get("ORDER_NUMBER"));
                      targetRow.set("headerId",  Integer.valueOf(mainList.get("HEADER_ID").toString()));
                      targetRow.set("partyID", Integer.valueOf(keyString.get("key").toString()));
                      targetRow.set("accountNumber", mainList.get("ACCOUNT_NUMBER"));
                      targetRow.set("customerName", sideList.get("PARTY_NAME"));
                      targetRow.set("updatedDate",keyString.get("updatedDate"));

                      c.output(targetRow);
                  }
            }));

1 Ответ

0 голосов
/ 15 апреля 2019

Скорее всего, один из ваших ключей равен нулю.Вы можете решить эту проблему, не имея этого в качестве первичного ключа.Первичные ключи не могут быть NULL или, если они составные первичные ключи, не могут содержать NULL.Вместо этого сделайте его уникальным индексом.Например, используйте поле автономного номера для первичного ключа.

...