Наш конвейер потока данных имеет DoFn, который читает из bigtable с помощью клиентского API hbase multiget. По-видимому, это приводит к случайному останову потока данных со следующим стеком:
Обработка застряла в шаге AttachStuff / BigtableAttacher как минимум на 04h10m00s без вывода или завершения в состоянии процесса
at sun.misc.Unsafe.park (родной метод)
в java.util.concurrent.locks.LockSupport.park (LockSupport.java:175)
на com.google.bigtable.repackaged.com.google.common.util.concurrent.AbstractFuture.get (AbstractFuture.java:523)
на com.google.bigtable.repackaged.com.google.api.core.AbstractApiFuture.get (AbstractApiFuture.java:56)
на com.google.cloud.bigtable.hbase.BatchExecutor.batchCallback (BatchExecutor.java:276)
на com.google.cloud.bigtable.hbase.BatchExecutor.batch (BatchExecutor.java:239)
на com.google.cloud.bigtable.hbase.AbstractBigtableTable.get (AbstractBigtableTable.java:241)
на com.askscio.google.docbuilder.BigtableAnchorsAttacher.getAnchors (BigtableAnchorsAttacher.java:86)
на com.askscio.google.docbuilder.BigtableAnchorsAttacher.process (BigtableAnchorsAttacher.java:129)
в com.askscio.docbuilder.core.ScioDoFn.processWithErrorHandling (ScioDoFn.java:39)
на com.askscio.google.docbuilder.BigtableAnchorsAttacher $ DoFnInvoker.invokeProcessElement (неизвестный источник)
Мы находимся в библиотеке лучей 2.12.0. DoFn устанавливает соединение с Bigtable в StartBundle.
Каждый вызов DoFn ищет не более 10 ключей от bigtable
Единый кластер, 3 узла и SSD. Использование хранилища составляет 2,2 ГБ, максимальная загрузка ЦП узла составляет 13%, а максимальная скорость чтения / записи составляет 2000 операций чтения / сек и 1000 операций записи / сек.
startBundle:
bigtableConn = BigtableConfiguration.connect(
config.getString(ConfigKeys.Google.PROJECT_ID),
config.getString(ConfigKeys.Google.INSTANCE_ID)
);
fooTable = bigtableConn.getTable(TableName.valueOf(BigtableDocumentStore.FOO_TABLE_NAME));
Процесс:
List<Get> gets = Lists.newArrayList();
// keys are no more than 10
for (String s : keys) {
Get get = new Get(Bytes.toBytes(s))
.addFamily(Bytes.toBytes(BigtableDocumentStore.FOO_COLUMN_FAMILY))
.setMaxVersions(1);
gets.add(get);
}
Result[] results= fooTable.get(gets);
демонтаж:
fooTable.close();
bigTableConn.close();