Как провести интеграционное тестирование записи конвейера потока данных в Bigtable? - PullRequest
0 голосов
/ 03 июля 2018

По данным сайта Beam ,

Часто быстрее и проще выполнять локальное модульное тестирование на вашем код конвейера, чем для отладки удаленного выполнения конвейера.

Я хочу использовать тестовую разработку для своего приложения Beam / Dataflow, которое по этой причине пишет в Bigtable.

Однако, следуя документации по тестированию Beam, я попал в тупик - PAssert бесполезен, потому что выходной PCollection содержит объекты org.apache.hadoop.hbase.client.Put, которые не переопределяют метод equals.

Я не могу получить содержимое PCollection для проверки на них, так как

Невозможно получить содержимое PCollection напрямую - Apache Beam или Dataflow больше похож на план запроса того, что обработка должна быть сделана, с PCollection, являющейся логическим промежуточный узел в плане, а не содержащий данные.

Так как я могу протестировать этот конвейер, кроме запуска его вручную? Я использую Maven и JUnit (в Java, поскольку это все, что поддерживает Bigtable Connector , кажется, поддерживает).

1 Ответ

0 голосов
/ 03 июля 2018

Плагин Bigtable Emulator Maven можно использовать для написания интеграционных тестов для этого:

  • Сконфигурируйте Плагин Maven Failsafe и измените окончание вашего тестового набора с * Test на * IT для запуска в качестве интеграционного теста.
  • Установите эмулятор Bigtable в gcloud sdk в командной строке:

    gcloud components install bigtable   
    

    Обратите внимание, что этот необходимый шаг приведет к снижению переносимости кода (например, будет ли он выполняться в вашей системе сборки? На компьютерах других разработчиков?), Поэтому я собираюсь его контейнировать с помощью Docker перед развертыванием в системе сборки.

  • Добавьте плагин эмулятора в pom согласно README

  • Используйте HBase Client API и посмотрите пример интеграционного теста Bigtable Emulator для настройки сеанса и таблиц.

  • Напишите свой тест в обычном режиме в соответствии с документацией Beam, за исключением того, что вместо использования PAssert на самом деле вызовите CloudBigtableIO.writeToTable, а затем используйте HBase Client для чтения данных из таблицы, чтобы проверить их.

Вот пример интеграционного теста:

package adair.example;

import static org.apache.hadoop.hbase.util.Bytes.toBytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.junit.Assert;
import org.junit.Test;

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;

/**
 *  A simple integration test example for use with the Bigtable Emulator maven plugin.
 */
public class DataflowWriteExampleIT {

  private static final String PROJECT_ID = "fake";
  private static final String INSTANCE_ID = "fakeinstance";
  private static final String TABLE_ID = "example_table";
  private static final String COLUMN_FAMILY = "cf";
  private static final String COLUMN_QUALIFIER = "cq";

  private static final CloudBigtableTableConfiguration TABLE_CONFIG =
    new CloudBigtableTableConfiguration.Builder()
      .withProjectId(PROJECT_ID)
      .withInstanceId(INSTANCE_ID)
      .withTableId(TABLE_ID)
      .build();

  public static final List<String> VALUES_TO_PUT = Arrays
    .asList("hello", "world", "introducing", "Bigtable", "plus", "Dataflow", "IT");

  @Test
  public void testPipelineWrite() throws IOException {
    try (Connection connection = BigtableConfiguration.connect(PROJECT_ID, INSTANCE_ID)) {
      Admin admin = connection.getAdmin();
      createTable(admin);

      List<Mutation> puts = createTestPuts();

      //Use Dataflow to write the data--this is where you'd call the pipeline you want to test.
      Pipeline p = Pipeline.create();
      p.apply(Create.of(puts)).apply(CloudBigtableIO.writeToTable(TABLE_CONFIG));
      p.run().waitUntilFinish();

      //Read the data from the table using the regular hbase api for validation
      ResultScanner scanner = getTableScanner(connection);
      List<String> resultValues = new ArrayList<>();
      for (Result row : scanner) {
        String cellValue = getRowValue(row);
        System.out.println("Found value in table: " + cellValue);
        resultValues.add(cellValue);
      }

      Assert.assertThat(resultValues,
        IsIterableContainingInAnyOrder.containsInAnyOrder(VALUES_TO_PUT.toArray()));
    }
  }

  private void createTable(Admin admin) throws IOException {
    HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(TABLE_ID));
    tableDesc.addFamily(new HColumnDescriptor(COLUMN_FAMILY));

    admin.createTable(tableDesc);
  }

  private ResultScanner getTableScanner(Connection connection) throws IOException {
    Scan scan = new Scan();
    Table table = connection.getTable(TableName.valueOf(TABLE_ID));
    return table.getScanner(scan);
  }

  private String getRowValue(Result row) {
    return Bytes.toString(row.getValue(toBytes(COLUMN_FAMILY), toBytes(COLUMN_QUALIFIER)));
  }

  private List<Mutation> createTestPuts() {
    return VALUES_TO_PUT
          .stream()
          .map(this::stringToPut)
          .collect(Collectors.toList());
  }

  private Mutation stringToPut(String cellValue){
    String key = UUID.randomUUID().toString();
    Put put = new Put(toBytes(key));
    put.addColumn(toBytes(COLUMN_FAMILY), toBytes(COLUMN_QUALIFIER), toBytes(cellValue));
    return put;
  }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...