Как я могу отфильтровать повторяющиеся данные TableRow для большого запроса, чтобы удалить повторяющиеся строки? - PullRequest
0 голосов
/ 10 июля 2019

Я новичок в Dataflow, так что простите меня, если мой вопрос смешной, у меня есть csv-файл, который я читаю, и в нем есть повторяющиеся строки, я читаю эти данные и пишу в большой запрос, однако я не хочу писать дубликаты данные к моей таблице BQ.

Я думал об одном подходе, но я не знаю, как его реализовать, он включает в себя добавление какого-либо флага в схему, чтобы пометить его как уникальный, но я не знаю, как

Lists.newArrayList(
  new TableFieldSchema()
         .setName("person_id")
         .setMode("NULLABLE").setType("STRING"),
  new TableFieldSchema()
         .setName("person_name")
         .setMode("NULLABLE")
         .setType("STRING") // Cant I add another unique property here?
) 

Не знаю, сработает ли этот метод, но все, что мне нужно, это отфильтровать строки, извлеченные из преобразования, например

PCollection<TableRow> peopleRows = 
  pipeline
     .apply(
        "Convert to BiqQuery Table Row",
        ParDo.of(new FormatForBigquery())

    // Next step to filter duplicates

Ответы [ 2 ]

2 голосов
/ 10 июля 2019

Если мы рассматриваем вывод для вашего чтения CSV как PCollection, то мы можем устранить дубликаты, передав PCollection через преобразование Distinct .Цель этого преобразования - взять входную коллекцию PCollection и сгенерировать новую коллекцию PCollection, которая является исходной коллекцией PCollection без дубликатов.В рамках предварительно созданного преобразования Distinct есть возможность указать свои собственные функции, которые будут вызываться для определения того, что классифицирует два объекта PCollection как равные и, следовательно, какие из них следует удалить.

1 голос
/ 11 июля 2019

Вы можете сделать это непосредственно в bigquery, загрузить весь файл и выполнить запрос, подобный следующему, с таблицей назначения в исходной таблице.Нет необходимости в потоке данных.

WITH cte as (
  SELECT
    ROW_NUMBER() over (PARTITION BY column1,column2,column3,...) as idx,
    *
  FROM my_table
)
SELECT
*
FROM cte
WHERE idx = 1
...