Модульное тестирование в IntelliJ: AssertionError: java.lang.IllegalArgumentException: аргумент не может быть пустым - PullRequest
0 голосов
/ 23 сентября 2019

Я пытаюсь протестировать некоторый код Java (пользовательский процессор NiFi) в IntelliJ и получить ошибку:

java.lang.AssertionError: java.lang.IllegalArgumentException: аргумент не может быть пустым

для большинства runner.run(1) строк в моем тестовом коде (в основном для каждой строки, кроме первой runner.run(1) строки).

Я считаю, что это связано с кодом процессорагде я пытаюсь извлечь значение из тега XML, а затем добавить его в качестве атрибута.Я знаю как код для извлечения значения из тега xml (doc.getElementsByTagName("Security_Tbl_Indx")), так и код для добавления его в качестве атрибута (flowFile = session.putAttribute).

Возможно, мне нужно явно сказать, что делать, если атрибут равен нулю?Но не должны ли тесты пропускать цикл for, если securityList.getLength() равно 0?

Вот код процессора:

public static final PropertyDescriptor TCP_HOST = new PropertyDescriptor.Builder().name("TCP_HOST")
        .displayName("TCP Host").description("TCP Host").defaultValue("localhost").required(true)
        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();

public static final PropertyDescriptor TCP_PORT = new PropertyDescriptor.Builder().name("TCP_PORT")
        .displayName("TCP Port").description("TCP Port").defaultValue("20100").required(true)
        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();

public static final PropertyDescriptor MESSAGE_TAG = new PropertyDescriptor.Builder().name("MESSAGE_TAG")
        .displayName("Message Tag").description("XML Tag that contains the Message").defaultValue("CMF_Doc")
        .required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();

public static final PropertyDescriptor CONNECTION_EXPIRATION = new PropertyDescriptor.Builder().name("CONN_EXPIRE")
        .displayName("Connection Expiration")
        .description("Period of time after receiving no records when the TCP connection will be reinitialized. "
                + "Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours")
        .defaultValue("15 minutes").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();

public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
        .description("Successfully processed incoming message").build();

public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
        .description("Failed to process an incoming message").build();

private List<PropertyDescriptor> descriptors;

private Set<Relationship> relationships;

private Socket kkSocket = null;
private String startTag;
private String endTag;
private boolean continueProcessing = true;

@Override
protected void init(final ProcessorInitializationContext context){
    final List<PropertyDescriptor> desc = new ArrayList<>();
    desc.add(TCP_HOST);
    desc.add(TCP_PORT);
    desc.add(MESSAGE_TAG);
    desc.add(CONNECTION_EXPIRATION);
    this.descriptors = Collections.unmodifiableList(desc);

    final Set<Relationship> rel = new HashSet<>();
    rel.add(REL_SUCCESS);
    rel.add(REL_FAILURE);
    this.relationships = Collections.unmodifiableSet(rel);
}

@Override
public Set<Relationship> getRelationships(){
    return this.relationships;
}

@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors(){
    return descriptors;
}

@OnScheduled
public void onScheduled(final ProcessContext context){
    continueProcessing = true;
}

@OnUnscheduled
public void onUnscheduled(){
    if(kkSocket != null && !kkSocket.isClosed()){
        try{
            kkSocket.shutdownInput();
        }catch(IOException e){
            getLogger().error("Error shutting down input on socket", e);
        }
    }
    continueProcessing = false;
}

@OnStopped
public void onStopped(){
    if(kkSocket != null && !kkSocket.isClosed()){
        try{
            kkSocket.close();
        }catch(IOException e){
            getLogger().error("Error closing socket", e);
        }
    }
    continueProcessing = false;
}



public String securityTableIndex;

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session){
    // No need to check if flowFile is null because this processor will
    // initiate the flow
    FlowFile flowFile = session.get();
    if(continueProcessing){
        String messageTag = context.getProperty(MESSAGE_TAG).evaluateAttributeExpressions(flowFile).getValue();
        this.startTag = "<" + messageTag;
        this.endTag = "</" + messageTag + ">";
        try(Socket socket = new Socket(
                context.getProperty(TCP_HOST).evaluateAttributeExpressions(flowFile).getValue(),
                context.getProperty(TCP_PORT).evaluateAttributeExpressions(flowFile).asInteger())){
            getLogger().info("Established connection.");
            socket.setSoTimeout(
                    context.getProperty(CONNECTION_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
            this.kkSocket = socket;

            // read data stream
            byte[] buffer = new byte[10240];
            StringBuilder sb = new StringBuilder();

            InputStream is = socket.getInputStream();
            int bytesRead;
            while((bytesRead = is.read(buffer)) != -1){
                // begin building the stringbuffer with data read from the
                // socket
                sb.append(new String(buffer, 0, bytesRead, "UTF-8"));

                // check to see if the buffer contains the eom tag
                int startIndex = sb.indexOf(startTag);
                int endIndex = sb.indexOf(endTag);
                while(endIndex != -1){
                    processXML(session, sb, startIndex, endIndex);

                    // then remove the captured message from the buffer
                    sb = new StringBuilder(sb.substring(sb.indexOf(endTag) + endTag.length()));

                    startIndex = sb.indexOf(startTag);
                    endIndex = sb.indexOf(endTag);
                } // end of if
            } // end of while
        }catch(SocketTimeoutException e){
            getLogger().error("Socket timed out.  Reinitializing.", e);
        }catch(IOException e){
            getLogger().error("Socket error", e);
        }
        getLogger().info("Closed connection.");
    }
}

private void processXML(final ProcessSession session, StringBuilder sb, int startIndex, int endIndex){
    if(startIndex != -1 && startIndex < endIndex){
        // capture the message from the start of the buffer to the first
        // occurrence of the EOM marker
        String xml = sb.substring(sb.indexOf(startTag), sb.indexOf(endTag) + endTag.length());
        if(getLogger().isDebugEnabled()){
            getLogger().debug("Extracted CMF message: " + xml);
        }


        boolean validMessage = isValid(xml);
        if(!validMessage){
            while(xml.indexOf(startTag, 1) != -1 && !validMessage){
                process(session, xml.substring(0, xml.indexOf(startTag, 1)), validMessage);
                xml = xml.substring(xml.indexOf(startTag, 1));
                validMessage = isValid(xml);
            }
        }

        // process the message and transfer to the success relationship
        // invalid xml transfer to the failed relationship
        this.process(session, xml, validMessage);
    }else{
        process(session, sb.substring(0, sb.indexOf(endTag) + endTag.length()), false);
    }
}

private boolean isValid(String xml){
    // assume valid message
    boolean validMessage = true;

    // determine if the captured message is complete
    // a complete message is a valid XML,
    // and an exception indicates an incomplete message
    try{
        final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
        factory.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
        factory.setFeature("http://xml.org/sax/features/external-general-entities", false);
        factory.setFeature("http://xml.org/sax/features/external-parameter-entities", false);
        factory.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd", false);
        factory.setXIncludeAware(false);
        factory.setExpandEntityReferences(false);

        final DocumentBuilder parser = factory.newDocumentBuilder();
        Document doc = parser.parse(new InputSource(new StringReader(xml)));

    // Grabs Security Index Number from header xml
        NodeList securityList = doc.getElementsByTagName("Security_Tbl_Indx");
          for(int i=0; i< securityList.getLength();i++) {
             Node security = securityList.item(i);
                  if(security.getNodeType()==Node.ELEMENT_NODE) {
                      Element securityNumber = (Element) security;
                      securityTableIndex = securityNumber.getTextContent();
                    }}

    }catch(IOException | SAXException | ParserConfigurationException | RuntimeException e){
        validMessage = false;
        getLogger().debug("Invalid XML. Dropping message.");
    } // end of try-catch

    return validMessage;
}

private void process(ProcessSession session, String message, boolean validMessage){
    // create a new flowfile and send it to the next processor
    FlowFile flowFile = session.create();

    // add index number grabbed from xml header as attribute to flowfile
    flowFile = session.putAttribute(flowFile, "securityTableIndex", securityTableIndex);


    flowFile = session.write(flowFile, new OutputStreamCallback(){
        @Override
        public void process(OutputStream out) throws IOException{
            if(getLogger().isDebugEnabled()){
                getLogger().debug("Writing CMF message to the FlowFile.");
            }
            out.write(message.getBytes("UTF-8"));
        }
    });

    if(validMessage){
        if(getLogger().isDebugEnabled()){
            getLogger().debug("Transferring FlowFile to the SUCCESS relationship.");
        }
        session.transfer(flowFile, REL_SUCCESS);
    }else{
        if(getLogger().isDebugEnabled()){
            getLogger().debug("Transferring FlowFile to the FAILURE relationship.");
        }
        session.transfer(flowFile, REL_FAILURE);
    }

    session.commit();
}
}

А вот код теста:

      public class ProcessorTest{
      final Logger LOGGER = LoggerFactory.getLogger(ProcessorTest.class);

private static final String MESSAGE_TAG = "Message";
private static final String PROLOGUE = "<?xml version=\"1.0\" encoding=\"utf-8\" ?>";
private static final String VALID_MESSAGE = "<Message><Test>1</Test><Name>name</Name><Security_Tbl_Indx>2</Security_Tbl_Indx></Message>";
private static final String START_TAG_MESSAGE = "<Message><Test>1</Test><Name>name</Name><Security_Tbl_Indx>2</Security_Tbl_Indx>";
private static final String END_TAG_MESSAGE = "<Test>1</Test><Name>name</Name><Security_Tbl_Indx>2</Security_Tbl_Indx></Message>";


@Test
public void testOnTriggerSuccess() throws InterruptedException, IOException{
    MockServer server = new MockServer(PROLOGUE + VALID_MESSAGE, PROLOGUE + VALID_MESSAGE,
            PROLOGUE + VALID_MESSAGE);

    final TestRunner runner = TestRunners.newTestRunner(new Processor());
    runner.setProperty(Processor.TCP_HOST, "localhost");
    runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
    runner.setProperty(Processor.MESSAGE_TAG, MESSAGE_TAG);

    server.start();

    runner.run(1);

    runner.assertTransferCount(Processor.REL_SUCCESS, 3);
    runner.assertTransferCount(Processor.REL_FAILURE, 0);

    MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(0);
    String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(VALID_MESSAGE));

    mockFlowFile.assertAttributeExists("securityTableIndex");
    mockFlowFile.assertAttributeEquals("securityTableIndex", "2");

    mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(1);
    content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(VALID_MESSAGE));

    mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(2);
    content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(VALID_MESSAGE));

    server.join();
    assertThat(server.getSentMessages(), is(3));
}

@Test
public void testOnTriggerSuccessManyMessages() throws InterruptedException, IOException{
    String message = "<a>%d</a>";
    String[] messages = new String[100];
    for(int i = 0; i < messages.length; i++){
        messages[i] = String.format(message, i);
    }
    MockServer server = new MockServer(messages);

    final TestRunner runner = TestRunners.newTestRunner(new Processor());
    runner.setProperty(Processor.TCP_HOST, "localhost");
    runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
    runner.setProperty(Processor.MESSAGE_TAG, "a");

    server.start();

    runner.run(1);

    server.join();
    assertThat(server.getSentMessages(), is(messages.length));

    runner.assertTransferCount(Processor.REL_SUCCESS, messages.length);
    runner.assertTransferCount(Processor.REL_FAILURE, 0);

    for(int i = 0; i < messages.length; i++){
        MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(i);
        String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
        assertThat(content, is(messages[i]));
    }
}

@Test
public void testOnTriggerSuccessLargeMessage() throws InterruptedException, IOException{
    StringBuilder message = new StringBuilder("<" + MESSAGE_TAG + ">");
    for(int i = 0; i < 10000; i++){
        message.append(String.format("<Test%d>%d</Test%d><Name%d>name%d</Name%d>", i, i, i, i, i, i));
    }
    message.append("</" + MESSAGE_TAG + ">");
    MockServer server = new MockServer(message.toString());

    final TestRunner runner = TestRunners.newTestRunner(new Processor());
    runner.setProperty(Processor.TCP_HOST, "localhost");
    runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
    runner.setProperty(Processor.MESSAGE_TAG, MESSAGE_TAG);

    server.start();

    runner.run(1);

    server.join();
    assertThat(server.getSentMessages(), is(1));

    runner.assertTransferCount(Processor.REL_SUCCESS, 1);
    runner.assertTransferCount(Processor.REL_FAILURE, 0);

    MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(0);
    String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(message.toString()));
}

@Test
public void testOnTriggerEmpty() throws InterruptedException, IOException{
    MockServer server = new MockServer(VALID_MESSAGE);

    final TestRunner runner = TestRunners.newTestRunner(new Processor());
    runner.setProperty(Processor.TCP_HOST, "localhost");
    runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
    runner.setProperty(Processor.MESSAGE_TAG, MESSAGE_TAG.toLowerCase());

    server.start();

    runner.run(1);
    server.join();
    assertThat(server.getSentMessages(), is(1));

    runner.assertTransferCount(Processor.REL_SUCCESS, 0);
    runner.assertTransferCount(Processor.REL_FAILURE, 0);
}

@Test
public void testOnTriggerStartTag() throws InterruptedException, IOException{
    MockServer server = new MockServer(START_TAG_MESSAGE, START_TAG_MESSAGE, VALID_MESSAGE, START_TAG_MESSAGE,
            VALID_MESSAGE, START_TAG_MESSAGE);

    final TestRunner runner = TestRunners.newTestRunner(new Processor());
    runner.setProperty(Processor.TCP_HOST, "localhost");
    runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
    runner.setProperty(Processor.MESSAGE_TAG, "Message");

    server.start();

    runner.run(1);
    server.join();
    assertThat(server.getSentMessages(), is(6));

    runner.assertTransferCount(Processor.REL_SUCCESS, 2);
    runner.assertTransferCount(Processor.REL_FAILURE, 3);

    MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_FAILURE).get(0);
    String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(START_TAG_MESSAGE));

    mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_FAILURE).get(1);
    content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(START_TAG_MESSAGE));

    mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_FAILURE).get(2);
    content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(START_TAG_MESSAGE));

    mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(0);
    content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(VALID_MESSAGE));

    mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(1);
    content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(VALID_MESSAGE));
}

@Test
public void testOnTriggerEndTag() throws InterruptedException, IOException{
    MockServer server = new MockServer(END_TAG_MESSAGE, VALID_MESSAGE, VALID_MESSAGE, END_TAG_MESSAGE,
            VALID_MESSAGE);

    final TestRunner runner = TestRunners.newTestRunner(new Processor());
    runner.setProperty(Processor.TCP_HOST, "localhost");
    runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
    runner.setProperty(Processor.MESSAGE_TAG, "Message");

    server.start();

    runner.run(1);
    server.join();
    assertThat(server.getSentMessages(), is(5));

    runner.assertTransferCount(Processor.REL_SUCCESS, 3);
    runner.assertTransferCount(Processor.REL_FAILURE, 2);

    MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_FAILURE).get(0);
    String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(END_TAG_MESSAGE));

    mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_FAILURE).get(1);
    content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(END_TAG_MESSAGE));

    mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(0);
    content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(VALID_MESSAGE));

    mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(1);
    content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(VALID_MESSAGE));

    mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(2);
    content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(VALID_MESSAGE));
}

@Test
public void testOnTriggerBadMessage() throws InterruptedException, IOException{
    MockServer server = new MockServer(END_TAG_MESSAGE, START_TAG_MESSAGE, VALID_MESSAGE);

    final TestRunner runner = TestRunners.newTestRunner(new Processor());
    runner.setProperty(Processor.TCP_HOST, "localhost");
    runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
    runner.setProperty(Processor.MESSAGE_TAG, "Message");

    server.start();

    runner.run(1);

    server.join();
    assertThat(server.getSentMessages(), is(3));

    runner.assertTransferCount(Processor.REL_SUCCESS, 1);
    runner.assertTransferCount(Processor.REL_FAILURE, 2);

    MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_FAILURE).get(0);
    String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(END_TAG_MESSAGE));

    mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_FAILURE).get(1);
    content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(START_TAG_MESSAGE));

    mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(0);
    content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(VALID_MESSAGE));
}

/**
 * Remove annotation since this method can fail if another server grabs the
 * port while this server is closed.
 */
public void testOnTriggerSuccessDisappearingServer() throws InterruptedException, IOException{
    DisappearingMockServer server = new DisappearingMockServer(PROLOGUE + VALID_MESSAGE, PROLOGUE + VALID_MESSAGE,
            PROLOGUE + VALID_MESSAGE);

    final TestRunner runner = TestRunners.newTestRunner(new Processor());
    runner.setProperty(Processor.TCP_HOST, "localhost");
    runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
    runner.setProperty(Processor.MESSAGE_TAG, MESSAGE_TAG);

    server.start();

    int count = 0;
    while(++count < 10 && server.getSentMessages() < 3){
        runner.run();
    }

    runner.assertTransferCount(Processor.REL_SUCCESS, 3);
    runner.assertTransferCount(Processor.REL_FAILURE, 0);

    MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(0);
    String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(VALID_MESSAGE));

    mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(1);
    content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(VALID_MESSAGE));

    mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(2);
    content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
    assertThat(content, is(VALID_MESSAGE));

    server.join();
    assertThat(server.getSentMessages(), is(3));
}

@Test
public void testOnTriggerSuccessDisappearingServerReopen() throws InterruptedException, IOException{
    DisappearingMockServer2 server = new DisappearingMockServer2(PROLOGUE + VALID_MESSAGE, PROLOGUE + VALID_MESSAGE,
            PROLOGUE + VALID_MESSAGE);
    int port = server.getPort();

    final TestRunner runner = TestRunners.newTestRunner(new Processor());
    runner.setProperty(Processor.TCP_HOST, "localhost");
    runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
    runner.setProperty(Processor.MESSAGE_TAG, MESSAGE_TAG);

    server.start();

    runner.run(2);

    runner.assertTransferCount(Processor.REL_SUCCESS, 3);
    runner.assertTransferCount(Processor.REL_FAILURE, 0);

    MockServer server2 = new MockServer(port, PROLOGUE + VALID_MESSAGE, PROLOGUE + VALID_MESSAGE,
            PROLOGUE + VALID_MESSAGE);
    server2.start();

    runner.run(2);
    runner.assertTransferCount(Processor.REL_SUCCESS, 6);
    runner.assertTransferCount(Processor.REL_FAILURE, 0);

    for(int i = 0; i < 6; i++){
        MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(i);
        String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
        assertThat(content, is(VALID_MESSAGE));
    }

    server.join();
    server2.join();
    assertThat(server.getSentMessages(), is(3));
    assertThat(server2.getSentMessages(), is(3));
}

@Test
public void testOnTriggerSuccessPausingServerReopen() throws InterruptedException, IOException{
    int expiration = 1;
    PausingMockServer server = new PausingMockServer(expiration * 1000 + 500, PROLOGUE + VALID_MESSAGE,
            PROLOGUE + VALID_MESSAGE, PROLOGUE + VALID_MESSAGE);

    final TestRunner runner = TestRunners.newTestRunner(new Processor());
    runner.setProperty(Processor.TCP_HOST, "localhost");
    runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
    runner.setProperty(Processor.MESSAGE_TAG, MESSAGE_TAG);
    runner.setProperty(Processor.CONNECTION_EXPIRATION, "" + expiration + " seconds");

    server.start();

    runner.run(2);

    runner.assertTransferCount(Processor.REL_SUCCESS, 6);
    runner.assertTransferCount(Processor.REL_FAILURE, 0);

    for(int i = 0; i < 6; i++){
        MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(i);
        String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
        assertThat(content, is(VALID_MESSAGE));
    }

    server.join();
    runner.shutdown();
    assertThat(server.getSentMessages(), is(6));
}

private class MockServer extends Thread{
    protected String[] messages;
    protected int sentMessages = 0;
    protected final ServerSocket server;

    public MockServer(String... messages) throws IOException{
        this(0, messages);
    }

    public MockServer(int port, String... messages) throws IOException{
        this.messages = messages;
        server = new ServerSocket(port);
    }

    @Override
    public void run(){
        try(final Socket clientSocket = server.accept()){
            LOGGER.debug("Server start sending.");
            BufferedOutputStream output = new BufferedOutputStream(clientSocket.getOutputStream());

            for(String message : messages){
                sentMessages++;
                output.write(message.getBytes());
                output.flush();
            }

            clientSocket.shutdownOutput();
        }catch(IOException e){
            LOGGER.error("Error sending messages from Mock Server", e);
        }finally{
            try{
                server.close();
            }catch(IOException e){
                LOGGER.error("Error closing socket", e);
            }
        }
    }

    public int getSentMessages(){
        return sentMessages;
    }

    public int getPort(){
        return server.getLocalPort();
    }
}

private class DisappearingMockServer extends Thread{
    protected Queue<String> messages;
    protected int sentMessages = 0;
    protected ServerSocket server;
    protected final int port;

    public DisappearingMockServer(String... messages) throws IOException{
        this.messages = new LinkedList<>();
        Arrays.stream(messages).forEach(this.messages::add);
        server = new ServerSocket(0);
        port = server.getLocalPort();
        server.close();
    }

    @Override
    public void run(){
        int attempts = messages.size() * 2;
        while(!messages.isEmpty() & --attempts >= 0){
            try{
                server = new ServerSocket(port);
                try(final Socket clientSocket = server.accept()){
                    LOGGER.debug("Server start sending.");
                    BufferedOutputStream output = new BufferedOutputStream(clientSocket.getOutputStream());

                    sentMessages++;
                    LOGGER.debug("Sending message: " + sentMessages);
                    output.write(messages.poll().getBytes());
                    output.flush();

                    clientSocket.shutdownOutput();
                }catch(IOException e){
                    LOGGER.error("Error sending messages from Mock Server", e);
                }finally{
                    try{
                        server.close();
                    }catch(IOException e){
                        LOGGER.error("Error closing socket", e);
                    }
                }
                Thread.sleep(1000);
            }catch(IOException | InterruptedException e){
                LOGGER.error("Error occurred.", e);
            }
        }
        LOGGER.debug("Finished sending messages.");
    }

    public int getSentMessages(){
        return sentMessages;
    }

    public int getPort(){
        return port;
    }
}

private class DisappearingMockServer2 extends MockServer{
    public DisappearingMockServer2(String... messages) throws IOException{
        super(messages);
    }

    @Override
    public void run(){
        try(final Socket clientSocket = server.accept()){
            LOGGER.debug("Server start sending.");
            BufferedOutputStream output = new BufferedOutputStream(clientSocket.getOutputStream());

            for(String message : messages){
                sentMessages++;
                output.write(message.getBytes());
                output.flush();
            }
            throw new IllegalArgumentException("ERROR");
        }catch(IOException e){
            LOGGER.error("Error sending messages from Mock Server", e);
        }finally{
            try{
                server.close();
            }catch(IOException e){
                LOGGER.error("Error closing socket", e);
            }
        }
    }
}

private class PausingMockServer extends Thread{
    protected String[] messages;
    protected int sentMessages = 0;
    protected long pauseMillis;
    protected final ServerSocket server;

    public PausingMockServer(long pauseMillis, String... messages) throws IOException{
        this(pauseMillis, 0, messages);
    }

    public PausingMockServer(long pauseMillis, int port, String... messages) throws IOException{
        this.messages = messages;
        this.pauseMillis = pauseMillis;
        server = new ServerSocket(port);
    }

    @Override
    public void run(){
        try(final Socket clientSocket = server.accept()){
            LOGGER.debug("Server start sending.");
            BufferedOutputStream output = new BufferedOutputStream(clientSocket.getOutputStream());

            for(String message : messages){
                sentMessages++;
                output.write(message.getBytes());
                output.flush();
            }

            Thread.sleep(pauseMillis);

            clientSocket.shutdownOutput();
        }catch(IOException e){
            LOGGER.error("Error sending messages from Pausing Mock Server", e);
        }catch(InterruptedException e){
            LOGGER.error("Interrupted Error sending messages from Pausing Mock Server", e);
        }

        try(final Socket clientSocket = server.accept()){
            LOGGER.debug("Server start sending.");
            BufferedOutputStream output = new BufferedOutputStream(clientSocket.getOutputStream());

            for(String message : messages){
                sentMessages++;
                output.write(message.getBytes());
                output.flush();
            }

            clientSocket.shutdownOutput();
        }catch(IOException e){
            LOGGER.error("Error sending messages from Pausing Mock Server", e);
        }finally{
            try{
                server.close();
            }catch(IOException e){
                LOGGER.error("Error closing server socket", e);
            }
        }
    }

    public int getSentMessages(){
        return sentMessages;
    }

    public int getPort(){
        return server.getLocalPort();
    }
}
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...