DataFlow DoFn неожиданно зависает при чтении из bigtable - PullRequest
0 голосов
/ 14 июня 2019

Наш конвейер потока данных имеет DoFn, который читает из bigtable с помощью клиентского API hbase multiget. По-видимому, это приводит к случайному останову потока данных со следующим стеком:

Обработка застряла в шаге AttachStuff / BigtableAttacher как минимум на 04h10m00s без вывода или завершения в состоянии процесса at sun.misc.Unsafe.park (родной метод) в java.util.concurrent.locks.LockSupport.park (LockSupport.java:175) на com.google.bigtable.repackaged.com.google.common.util.concurrent.AbstractFuture.get (AbstractFuture.java:523) на com.google.bigtable.repackaged.com.google.api.core.AbstractApiFuture.get (AbstractApiFuture.java:56) на com.google.cloud.bigtable.hbase.BatchExecutor.batchCallback (BatchExecutor.java:276) на com.google.cloud.bigtable.hbase.BatchExecutor.batch (BatchExecutor.java:239) на com.google.cloud.bigtable.hbase.AbstractBigtableTable.get (AbstractBigtableTable.java:241) на com.askscio.google.docbuilder.BigtableAnchorsAttacher.getAnchors (BigtableAnchorsAttacher.java:86) на com.askscio.google.docbuilder.BigtableAnchorsAttacher.process (BigtableAnchorsAttacher.java:129) в com.askscio.docbuilder.core.ScioDoFn.processWithErrorHandling (ScioDoFn.java:39) на com.askscio.google.docbuilder.BigtableAnchorsAttacher $ DoFnInvoker.invokeProcessElement (неизвестный источник)

Мы находимся в библиотеке лучей 2.12.0. DoFn устанавливает соединение с Bigtable в StartBundle.

Каждый вызов DoFn ищет не более 10 ключей от bigtable

Единый кластер, 3 узла и SSD. Использование хранилища составляет 2,2 ГБ, максимальная загрузка ЦП узла составляет 13%, а максимальная скорость чтения / записи составляет 2000 операций чтения / сек и 1000 операций записи / сек.

startBundle:

bigtableConn = BigtableConfiguration.connect(
    config.getString(ConfigKeys.Google.PROJECT_ID),
    config.getString(ConfigKeys.Google.INSTANCE_ID)
);
fooTable = bigtableConn.getTable(TableName.valueOf(BigtableDocumentStore.FOO_TABLE_NAME));

Процесс:

List<Get> gets = Lists.newArrayList();
// keys are no more than 10
for (String s : keys) {
   Get get = new Get(Bytes.toBytes(s))
                     .addFamily(Bytes.toBytes(BigtableDocumentStore.FOO_COLUMN_FAMILY))
                        .setMaxVersions(1);
   gets.add(get);
}
Result[] results= fooTable.get(gets);

демонтаж:

fooTable.close();
bigTableConn.close();

1 Ответ

0 голосов
/ 03 июля 2019

Я бы порекомендовал перенести управление соединениями в @Setup & Teardown и использовать счетчик ссылок, если вы используете многоядерных рабочих.

Соединения Bigtable имеют очень большой вес и предназначены для одного процесса. Объект соединения HBase, возвращаемый BigtableConfiguration.connect (), фактически оборачивает пул каналов grpc с 2 каналами на процессор, что очень дорого для построения.

У вас есть несколько вариантов улучшения конвейера:

  1. установите для параметра конфигурации "google.bigtable.use.cached.data.channel.pool" значение "true", что позволит повторно использовать внутренний пул соединений

  2. Сделайте что-то подобное в своем DoFn:

    // instance vars
    static Object connectionLock = new Object();
    static Connection bigtableConn = null;
    
    // @Setup
    synchronized(connectionLock) {
      if (numWorkers++ == 0) {
        bigtableConn = BigtableConfiguration.connect(...);
      } 
    }
    
    // @Teardown
    synchronized(connectionLock) {
      if (--numWorkers == 0) {
        bigtableConn.close();
      } 
    }
    
...