Не удается записать ключ в KeyValueTable - PullRequest
1 голос
/ 20 ноября 2019

Я разработал и успешно развернул пользовательский плагин пакетного источника для платформы CDAP в Google Data Fusion. Плагин иногда работает в режиме предварительного просмотра, но всегда происходит сбой при развертывании конвейера со следующей ошибкой:

org.apache.tephra.TransactionFailureException: Unable to persist changes of transaction-aware 'RecordGenerator' for transaction 1574271280330000000
    at io.cdap.cdap.data2.transaction.AbstractTransactionContext.createTransactionFailure(AbstractTransactionContext.java:313) ~[na:na]
    at io.cdap.cdap.data2.transaction.AbstractTransactionContext.persist(AbstractTransactionContext.java:260) ~[na:na]
    at io.cdap.cdap.data2.transaction.AbstractTransactionContext.finish(AbstractTransactionContext.java:115) ~[na:na]
    at io.cdap.cdap.data2.transaction.Transactions$CacheBasedTransactional.finishExecute(Transactions.java:230) ~[na:na]
    at io.cdap.cdap.data2.transaction.Transactions$CacheBasedTransactional.execute(Transactions.java:211) ~[na:na]
    at io.cdap.cdap.internal.app.runtime.AbstractContext.execute(AbstractContext.java:546) ~[na:na]
    at io.cdap.cdap.internal.app.runtime.AbstractContext.execute(AbstractContext.java:534) ~[na:na]
    at io.cdap.cdap.app.runtime.spark.BasicSparkClientContext.execute(BasicSparkClientContext.java:333) ~[io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at io.cdap.cdap.etl.common.submit.SubmitterPlugin.prepareRun(SubmitterPlugin.java:69) ~[na:na]
    at io.cdap.cdap.etl.batch.PipelinePhasePreparer.prepare(PipelinePhasePreparer.java:111) ~[na:na]
    at io.cdap.cdap.etl.spark.batch.SparkPreparer.prepare(SparkPreparer.java:104) ~[na:na]
    at io.cdap.cdap.etl.spark.batch.ETLSpark.initialize(ETLSpark.java:112) ~[na:na]
    at io.cdap.cdap.api.spark.AbstractSpark.initialize(AbstractSpark.java:131) ~[na:na]
    at io.cdap.cdap.api.spark.AbstractSpark.initialize(AbstractSpark.java:33) ~[na:na]
    at io.cdap.cdap.app.runtime.spark.SparkRuntimeService$2.initialize(SparkRuntimeService.java:167) ~[io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at io.cdap.cdap.app.runtime.spark.SparkRuntimeService$2.initialize(SparkRuntimeService.java:162) ~[io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at io.cdap.cdap.internal.app.runtime.AbstractContext.lambda$initializeProgram$1(AbstractContext.java:640) ~[na:na]
    at io.cdap.cdap.internal.app.runtime.AbstractContext.execute(AbstractContext.java:600) ~[na:na]
    at io.cdap.cdap.internal.app.runtime.AbstractContext.initializeProgram(AbstractContext.java:637) ~[na:na]
    at io.cdap.cdap.app.runtime.spark.SparkRuntimeService.initialize(SparkRuntimeService.java:433) ~[io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at io.cdap.cdap.app.runtime.spark.SparkRuntimeService.startUp(SparkRuntimeService.java:208) ~[io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:47) ~[com.google.guava.guava-13.0.1.jar:na]
    at io.cdap.cdap.app.runtime.spark.SparkRuntimeService$5$1.run(SparkRuntimeService.java:404) [io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_222]
    Suppressed: org.apache.tephra.TransactionFailureException: Unable to roll back changes in transaction-aware 'RecordGenerator' for transaction 1574271280330000000
        at io.cdap.cdap.data2.transaction.AbstractTransactionContext.createTransactionFailure(AbstractTransactionContext.java:313) ~[na:na]
        at io.cdap.cdap.data2.transaction.AbstractTransactionContext.abort(AbstractTransactionContext.java:143) ~[na:na]
        ... 23 common frames omitted
Caused by: java.io.IOException: Database /data/ldb/cdap_default.RecordGenerator.kv does not exist and the create if missing option is disabled
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService.openTable(LevelDBTableService.java:218)
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService.getTable(LevelDBTableService.java:181)
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableCore.getDB(LevelDBTableCore.java:80)
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableCore.undo(LevelDBTableCore.java:184)
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTable.undoPersisted(LevelDBTable.java:113)
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTable.undo(LevelDBTable.java:108)
    at io.cdap.cdap.data2.dataset2.lib.table.BufferingTable.rollbackTx(BufferingTable.java:368)
    at io.cdap.cdap.api.dataset.lib.AbstractDataset.rollbackTx(AbstractDataset.java:101)
    at io.cdap.cdap.data2.transaction.AbstractTransactionContext.abort(AbstractTransactionContext.java:141)
    ... 23 common frames omitted
Caused by: java.io.IOException: Database /data/ldb/cdap_default.RecordGenerator.kv does not exist and the create if missing option is disabled
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService.openTable(LevelDBTableService.java:218) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService.getTable(LevelDBTableService.java:181) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableCore.getDB(LevelDBTableCore.java:80) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableCore.persist(LevelDBTableCore.java:164) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTable.persist(LevelDBTable.java:100) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTable.persist(LevelDBTable.java:92) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.BufferingTable.commitTx(BufferingTable.java:351) ~[na:na]
    at io.cdap.cdap.api.dataset.lib.AbstractDataset.commitTx(AbstractDataset.java:91) ~[na:na]
    at io.cdap.cdap.data2.transaction.AbstractTransactionContext.persist(AbstractTransactionContext.java:255) ~[na:na]
    ... 22 common frames omitted

Ошибка, по моему мнению, вводит в заблуждение, поскольку ошибка происходит из следующего фрагмента кода внутри плагина:

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    super.configurePipeline(pipelineConfigurer);
    pipelineConfigurer.createDataset("RecordGenerator", KeyValueTable.class);
    pipelineConfigurer.getStageConfigurer().setOutputSchema(OUTPUT_SCHEMA);
}

@Override
@TransactionPolicy(TransactionControl.IMPLICIT)
public void prepareRun(BatchSourceContext context) throws IOException {
    KeyValueTable d = context.getDataset("RecordGenerator");
    d.write("numberOfRecords", Long.toString(config.numberOfRecords));
    context.setInput(Input.ofDataset("RecordGenerator"));
}

В частности, виновной является d.write("numberOfRecords", Long.toString(config.numberOfRecords));. Если я удаляю эту строку, плагин работает, но, очевидно, не запускает часть плагина transform.

У меня нет идей, поведение кажется странным в режиме предварительного просмотра, и документация (если есть)очень редко, чтобы сказать лучшее. Что я могу сделать, чтобы это работало?

1 Ответ

4 голосов
/ 21 ноября 2019

KeyValueTable не поддерживается в Data Fusion. Это работало в предварительном просмотре, потому что предварительный просмотр работает в локальном режиме. Если вы хотите сохранить что-либо в методе prepareRun(), вам нужно использовать другое хранилище. Простая альтернатива - использовать файл в gcs для хранения информации. Вот фрагмент кода, который вы можете использовать для записи информации в файл: https://github.com/data-integrations/kafka-plugins/blob/release/2.2/kafka-plugins-0.10/src/main/java/io/cdap/plugin/batch/source/KafkaBatchSource.java#L160-L167

...