Можем ли мы и как сохранить файлы потока apache nifi вместе с их атрибутами на диск, чтобы мы могли использовать / перечитать их для пользовательских тестовых случаев - PullRequest
0 голосов
/ 07 мая 2019

Я пытаюсь исследовать, как мы можем написать блок-тесты для Apache Nifi, чтобы я мог избежать цикла «изменить код, собрать nar, вставить nar в папку lib, перезапустить nifi». Тем не менее, я думаю, для этого мне также нужно записывать потоковые файлы на локальный диск и перезагружать их каждый раз, когда я запускаю тестовый блок. Я наткнулся на эту статью, в которой предлагается сериализовать потоковые файлы на диск, а затем прочитать эти файлы и поставить их в очередь на процессор при модульном тестировании, чтобы передать их на мой пользовательский процессор, который я сейчас разрабатываю. В статье предлагается использовать MergeContent с параметром FlowFileV3, а затем использовать PutFile. Мне удалось сохранить эти файлы в формате .pkg. Я читаю их обратно в коде моего модульного теста, как предложено в той же статье, используя процессоры GetFile, IndetifyMimeType и UnpackContent. Однако я делаю это в коде, как показано ниже:

//Get File
TestRunner getFileRunner = TestRunners.newTestRunner(new GetFile());
getFileRunner.setProperty(GetFile.DIRECTORY, "C:\\Mahesh\\delete\\serialized-flow-file-2");
getFileRunner.setProperty(GetFile.KEEP_SOURCE_FILE, "true");
getFileRunner.run(1);
List<MockFlowFile> getFileResult = getFileRunner.getFlowFilesForRelationship(GetFile.REL_SUCCESS);

List<? extends FlowFile> getFileFFResult = getFileResult;

//IdentifyMimeType
TestRunner identifyMimeTypeRunner = TestRunners.newTestRunner(new IdentifyMimeType());
identifyMimeTypeRunner.enqueue(getFileFFResult.toArray(new FlowFile[getFileFFResult.size()]));
identifyMimeTypeRunner.run(1);
List<MockFlowFile> identifyMimeTypeResult = identifyMimeTypeRunner.getFlowFilesForRelationship(IdentifyMimeType.REL_SUCCESS);

List<? extends FlowFile> identifyMimeTypeFFResult = identifyMimeTypeResult;

//UnpackContent
TestRunner unpackContentRunner = TestRunners.newTestRunner(new UnpackContent());
unpackContentRunner.enqueue(identifyMimeTypeFFResult.toArray(new FlowFile[identifyMimeTypeFFResult.size()]));
unpackContentRunner.run(1);
List<MockFlowFile> unpackContentResult = unpackContentRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);

List<? extends FlowFile> unpackContentFFResult = unpackContentResult;

Однако я получаю следующее исключение:

17:39:36.676 [pool-1-thread-1] INFO org.apache.nifi.processors.standard.GetFile - GetFile[id=2e2161db-48a7-4a13-b7dd-ec75ce2b30dc] added FlowFile[0,618912147321300.pkg,556530B] to flow
17:40:08.772 [pool-2-thread-1] INFO org.apache.nifi.processors.standard.IdentifyMimeType - IdentifyMimeType[id=aefc3abe-0820-48a0-8935-e905aeadb191] Identified FlowFile[0,618912147321300.pkg,556530B] as having MIME Type application/flowfile-v3
17:40:48.625 [pool-3-thread-1] ERROR org.apache.nifi.processors.standard.UnpackContent - UnpackContent[id=6840bef5-4e52-48ac-be2f-1f9580eeb144] UnpackContent[id=6840bef5-4e52-48ac-be2f-1f9580eeb144] failed to process due to java.lang.IllegalStateException: FlowFile[0,620665804273900.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed; rolling back session: java.lang.IllegalStateException: FlowFile[0,620665804273900.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
17:40:48.630 [pool-3-thread-1] ERROR org.apache.nifi.processors.standard.UnpackContent - 
java.lang.IllegalStateException: FlowFile[0,620665804273900.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
    at org.apache.nifi.util.MockProcessSession.validateState(MockProcessSession.java:1014)
    at org.apache.nifi.util.MockProcessSession.putAllAttributes(MockProcessSession.java:488)
    at org.apache.nifi.util.MockProcessSession.inheritAttributes(MockProcessSession.java:1044)
    at org.apache.nifi.util.MockProcessSession.create(MockProcessSession.java:299)
    at org.apache.nifi.util.MockProcessSession.create(MockProcessSession.java:62)
    at org.apache.nifi.processors.standard.UnpackContent$FlowFileStreamUnpacker$1.process(UnpackContent.java:415)
    at org.apache.nifi.util.MockProcessSession.read(MockProcessSession.java:547)
    at org.apache.nifi.util.MockProcessSession.read(MockProcessSession.java:529)
    at org.apache.nifi.processors.standard.UnpackContent$FlowFileStreamUnpacker.unpack(UnpackContent.java:409)
    at org.apache.nifi.processors.standard.UnpackContent.onTrigger(UnpackContent.java:255)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:251)
    at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:245)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

У меня есть некоторые сомнения:

  1. Во-первых, очевидно, почему я получаю следующую ошибку: FlowFile already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed?

  2. Я делаю это правильно? Правильный ли мой подход для сохранения этих потоковых файлов с использованием MergeContent и PutFile и последующего чтения их с использованием GetFile, IndentifyMimeType и UnpackContent? Я думал подать на мой канал вывод UnpackContent на мой пользовательский процессор TestRunner? Это все правильно? Или это какой-то другой более предпочтительный / стандартный подход к этому, который мне просто не хватает?

  3. Сохранит ли этот подход атрибуты потоковых файлов (как сказано в статье), чтобы я мог вслепую ставить их в очередь для тестового запуска моего пользовательского процессора, и он будет работать чисто (если мне вообще удастся исправить вышеприведенное исключение)

Редактировать

Во время отладки я вошел в некоторые из этих классов инфраструктуры, а затем в оболочке отладки eclipse я сделал e.printStackTrace(), и он напечатал это:

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:89)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:41)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:541)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:763)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:463)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:209)
Caused by: java.lang.AssertionError: java.lang.IllegalStateException: FlowFile[0,622261873281800.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
    at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:201)
    at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:160)
    at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:155)
    at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:150)
    at MyCustomProcessorTest.testOnTrigger(MyCustomProcessorTest.java:47)
    ... 23 more
Caused by: java.lang.IllegalStateException: FlowFile[0,622261873281800.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
    at org.apache.nifi.util.MockProcessSession.validateState(MockProcessSession.java:1014)
    at org.apache.nifi.util.MockProcessSession.putAllAttributes(MockProcessSession.java:488)
    at org.apache.nifi.util.MockProcessSession.inheritAttributes(MockProcessSession.java:1044)
    at org.apache.nifi.util.MockProcessSession.create(MockProcessSession.java:299)
    at org.apache.nifi.util.MockProcessSession.create(MockProcessSession.java:62)
    at org.apache.nifi.processors.standard.UnpackContent$FlowFileStreamUnpacker$1.process(UnpackContent.java:415)
    at org.apache.nifi.util.MockProcessSession.read(MockProcessSession.java:547)
    at org.apache.nifi.util.MockProcessSession.read(MockProcessSession.java:529)
    at org.apache.nifi.processors.standard.UnpackContent$FlowFileStreamUnpacker.unpack(UnpackContent.java:409)
    at org.apache.nifi.processors.standard.UnpackContent.onTrigger(UnpackContent.java:255)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:251)
    at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:245)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

, где MyCustomProcessorTest.java:47 равно unpackContentRunner.run(1).

1 Ответ

3 голосов
/ 07 мая 2019

Фреймворк на самом деле не предназначен для написания тестов, объединяющих несколько процессоров.Макет макета предназначен для модульного тестирования отдельного процессора.

Существует множество различных способов настройки файла потока с помощью макета макета. Содержимое файла потока может быть получено из файла, строки, входного потока.или байтовый массив:

https://github.com/apache/nifi/blob/master/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java#L387-L453

Для настройки ожидаемых атрибутов файла потока можно указать дополнительную карту атрибутов.

Обычным подходом является настройкафайлы в src / test / resources для любых данных, ожидаемых вашим пользовательским процессором, а затем вызовите testRunner.enqueue (pathToTestFile).

...