Чтение BigTable и преобразование в общие записи с использованием GCP Cloud DataFlow - PullRequest
0 голосов
/ 07 января 2019

Я пытаюсь преобразовать данные таблицы BigTable в родовую запись, используя поток данных. После того, как преобразование выполнено, я должен сравнить его с другими наборами данных в корзине. Ниже мой псевдокод, для конвейера я использовал

  pipeline
     .apply("Read from bigtable", BigTableIo.read)
     .apply("Transform BigTable to Avro Genric Records ",
         ParDo.of(new TransformAvro(out.toString())))
     .apply("Compare to existing avro file ")
     .apply("Write back the data to bigTable")

// Function code is below to convert genric record     

 public class BigTableToAvroFunction
    extends DoFn<KV<ByteString, Iterable<Mutation>>, GenericRecord>  {
       @ProcessElement
       public void processelement(ProcessContext context){
         GenericRecord gen = null ;
         ByteString key = context.element().getKey();
         Iterable<Mutation> value  = context.element().getValue();
         KV<ByteString, Iterable<Mutation>> element = context.element(); 
 } 

Я застрял здесь.

1 Ответ

0 голосов
/ 08 января 2019

Неясно, что вы имеете в виду, сравнивая с существующими данными в корзине. Это зависит от того, как вы хотите сделать сравнение, каков размер файла, возможно, другие вещи. Примеры ввода против вывода помогли бы.

Например, если то, что вы пытаетесь сделать, похоже на операцию Join, вы можете попробовать использовать CoGroupByKey ( ссылка на документ ), чтобы объединить два PCollections, одно чтение из BigTable другое чтение Avros от GCS .

Или же, если файл имеет разумный размер (умещается в памяти), вы, вероятно, можете смоделировать его как боковой ввод ( ссылка на документ ).

Или, в конечном итоге, вы всегда можете использовать сырой GCS API для запроса данных в ParDo и делать все вручную.

...