Чтение больших таблиц из середины конвейера потока данных - PullRequest
2 голосов
/ 12 февраля 2020

У меня есть конвейер, который получает некоторые данные из sub pub, выполняет некоторую обработку, и мне нужно обработать все данные в Bigtable на основе результата этой обработки.

Например, у меня есть паб-сообщение типа: {clientId: 10}, поэтому мне нужно прочитать из Bigtable все данные для clientId 10 (я знаю, как создать сканирование на основе clientId). Проблема состоит в том, что оба чтения, которые мы имеем на данный момент для Bigtable (BigtableIO и CloudBigtableIO), основаны на том факте, что конвейер начинается с bigtable, поэтому я не могу (или не смог найти способ) использовать их в середине трубопровод. Как мне добиться этого случая?

Простой псевдоподобный код:

Pipeline p = Pipeline.create(...)
p.apply(PubsubIO.readMessagesWithAttributes ...)
.apply( PubsubMessageToScans()) // I know how to do this
.apply( ReadBigTable()) // How to do this?

Ответы [ 2 ]

3 голосов
/ 15 февраля 2020

Чтобы дополнить ответ @ Билли, вы также можете попробовать использовать класс BigtableDataClient внутри преобразования ParDo. Входными данными будут параметры, содержащиеся в PubsubMessage для настройки объекта Scan, затем в ParDo установите параметры сканирования, установите соединение с BigTable и получите отфильтрованные результаты.

Этот фрагмент может быть полезен:

    @ProcessElement
    public void processElement(@Element String element, OutputReceiver<String> out){

        String projectId = "<PROJECT_ID>";
        String instanceId = "<INSTANCE_ID>";
        String tableName = "<TABLENAME>";


        String[] scanParameters = element.split(",");

        try (Connection connection = BigtableConfiguration.connect(projectId, instanceId)){

            Table table = connection.getTable(TableName.valueOf(tableName));

            Scan scan = new Scan();
            scan.withStartRow(Bytes.toBytes(scanParameters[0]));
            scan.withStopRow(Bytes.toBytes(scanParameters[1]));

            ResultScanner scanner = table.getScanner(scan);

            for (Result row : scanner) {
                System.out.println(row);
            }

            catch (Exception e){
                e.printStackTrace();
            }

            out.output("");
        }

Я не тестировал его непосредственно с PubsubMessage, но вы можете выполнить другое преобразование, чтобы адаптировать сообщение или напрямую получить PubsubMessage и установить объект сканирования.

0 голосов
/ 12 февраля 2020

ОБНОВЛЕНО:

Недавно я играл с Bigtable и Dataflow и столкнулся с той же проблемой, которую вы описали здесь. Я не верю, что есть способ сделать Read.from(CloudBigtableIO.read(config) в середине конвейера, поэтому вам придется создать свой собственный DoFn. Вы можете расширить AbstractCloudBigtableTableDoFn и получить доступ к легко используемому и настраиваемому соединению Bigtable через getConnection(). Вот пример работы Dataflow / Beam, которую я собрал, которая показывает, как это сделать:

public class ReadInMiddleOfPipeline {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);

    Pipeline p = Pipeline.create(options);
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(GenerateSequence.from(0).to(10).withRate(1, new Duration(1000)))
        .apply(ParDo.of(new ReadFromTableFn(bigtableTableConfig)));

    p.run().waitUntilFinish();
  }

  static class ReadFromTableFn extends AbstractCloudBigtableTableDoFn<Long, Void> {
    public ReadFromTableFn(CloudBigtableConfiguration config) {
      super(config);
    }

    @ProcessElement
    public void processElement(@Element Long input, OutputReceiver<Void> out, PipelineOptions po) {
      BigtableOptions options = po.as(BigtableOptions.class);
      try {
        Table table = getConnection().getTable(TableName.valueOf(options.getBigtableTableId()));
        Scan scan = new Scan().setRowPrefixFilter(Bytes.toBytes("#phone"));
        ResultScanner rows = table.getScanner(scan);

        for (Result row : rows) {
          System.out.printf(
              "Reading data for %s%n", Bytes.toString(row.rawCells()[0].getRowArray()));
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("bigtable-table")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...