Я пытаюсь протестировать некоторый код Java (пользовательский процессор NiFi) в IntelliJ и получить ошибку:
java.lang.AssertionError: java.lang.IllegalArgumentException: аргумент не может быть пустым
для большинства runner.run(1)
строк в моем тестовом коде (в основном для каждой строки, кроме первой runner.run(1)
строки).
Я считаю, что это связано с кодом процессорагде я пытаюсь извлечь значение из тега XML, а затем добавить его в качестве атрибута.Я знаю как код для извлечения значения из тега xml (doc.getElementsByTagName("Security_Tbl_Indx")
), так и код для добавления его в качестве атрибута (flowFile = session.putAttribute
).
Возможно, мне нужно явно сказать, что делать, если атрибут равен нулю?Но не должны ли тесты пропускать цикл for, если securityList.getLength()
равно 0?
Вот код процессора:
public static final PropertyDescriptor TCP_HOST = new PropertyDescriptor.Builder().name("TCP_HOST")
.displayName("TCP Host").description("TCP Host").defaultValue("localhost").required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
public static final PropertyDescriptor TCP_PORT = new PropertyDescriptor.Builder().name("TCP_PORT")
.displayName("TCP Port").description("TCP Port").defaultValue("20100").required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
public static final PropertyDescriptor MESSAGE_TAG = new PropertyDescriptor.Builder().name("MESSAGE_TAG")
.displayName("Message Tag").description("XML Tag that contains the Message").defaultValue("CMF_Doc")
.required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
public static final PropertyDescriptor CONNECTION_EXPIRATION = new PropertyDescriptor.Builder().name("CONN_EXPIRE")
.displayName("Connection Expiration")
.description("Period of time after receiving no records when the TCP connection will be reinitialized. "
+ "Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours")
.defaultValue("15 minutes").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successfully processed incoming message").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Failed to process an incoming message").build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
private Socket kkSocket = null;
private String startTag;
private String endTag;
private boolean continueProcessing = true;
@Override
protected void init(final ProcessorInitializationContext context){
final List<PropertyDescriptor> desc = new ArrayList<>();
desc.add(TCP_HOST);
desc.add(TCP_PORT);
desc.add(MESSAGE_TAG);
desc.add(CONNECTION_EXPIRATION);
this.descriptors = Collections.unmodifiableList(desc);
final Set<Relationship> rel = new HashSet<>();
rel.add(REL_SUCCESS);
rel.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(rel);
}
@Override
public Set<Relationship> getRelationships(){
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors(){
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context){
continueProcessing = true;
}
@OnUnscheduled
public void onUnscheduled(){
if(kkSocket != null && !kkSocket.isClosed()){
try{
kkSocket.shutdownInput();
}catch(IOException e){
getLogger().error("Error shutting down input on socket", e);
}
}
continueProcessing = false;
}
@OnStopped
public void onStopped(){
if(kkSocket != null && !kkSocket.isClosed()){
try{
kkSocket.close();
}catch(IOException e){
getLogger().error("Error closing socket", e);
}
}
continueProcessing = false;
}
public String securityTableIndex;
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session){
// No need to check if flowFile is null because this processor will
// initiate the flow
FlowFile flowFile = session.get();
if(continueProcessing){
String messageTag = context.getProperty(MESSAGE_TAG).evaluateAttributeExpressions(flowFile).getValue();
this.startTag = "<" + messageTag;
this.endTag = "</" + messageTag + ">";
try(Socket socket = new Socket(
context.getProperty(TCP_HOST).evaluateAttributeExpressions(flowFile).getValue(),
context.getProperty(TCP_PORT).evaluateAttributeExpressions(flowFile).asInteger())){
getLogger().info("Established connection.");
socket.setSoTimeout(
context.getProperty(CONNECTION_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
this.kkSocket = socket;
// read data stream
byte[] buffer = new byte[10240];
StringBuilder sb = new StringBuilder();
InputStream is = socket.getInputStream();
int bytesRead;
while((bytesRead = is.read(buffer)) != -1){
// begin building the stringbuffer with data read from the
// socket
sb.append(new String(buffer, 0, bytesRead, "UTF-8"));
// check to see if the buffer contains the eom tag
int startIndex = sb.indexOf(startTag);
int endIndex = sb.indexOf(endTag);
while(endIndex != -1){
processXML(session, sb, startIndex, endIndex);
// then remove the captured message from the buffer
sb = new StringBuilder(sb.substring(sb.indexOf(endTag) + endTag.length()));
startIndex = sb.indexOf(startTag);
endIndex = sb.indexOf(endTag);
} // end of if
} // end of while
}catch(SocketTimeoutException e){
getLogger().error("Socket timed out. Reinitializing.", e);
}catch(IOException e){
getLogger().error("Socket error", e);
}
getLogger().info("Closed connection.");
}
}
private void processXML(final ProcessSession session, StringBuilder sb, int startIndex, int endIndex){
if(startIndex != -1 && startIndex < endIndex){
// capture the message from the start of the buffer to the first
// occurrence of the EOM marker
String xml = sb.substring(sb.indexOf(startTag), sb.indexOf(endTag) + endTag.length());
if(getLogger().isDebugEnabled()){
getLogger().debug("Extracted CMF message: " + xml);
}
boolean validMessage = isValid(xml);
if(!validMessage){
while(xml.indexOf(startTag, 1) != -1 && !validMessage){
process(session, xml.substring(0, xml.indexOf(startTag, 1)), validMessage);
xml = xml.substring(xml.indexOf(startTag, 1));
validMessage = isValid(xml);
}
}
// process the message and transfer to the success relationship
// invalid xml transfer to the failed relationship
this.process(session, xml, validMessage);
}else{
process(session, sb.substring(0, sb.indexOf(endTag) + endTag.length()), false);
}
}
private boolean isValid(String xml){
// assume valid message
boolean validMessage = true;
// determine if the captured message is complete
// a complete message is a valid XML,
// and an exception indicates an incomplete message
try{
final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
factory.setFeature("http://xml.org/sax/features/external-general-entities", false);
factory.setFeature("http://xml.org/sax/features/external-parameter-entities", false);
factory.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd", false);
factory.setXIncludeAware(false);
factory.setExpandEntityReferences(false);
final DocumentBuilder parser = factory.newDocumentBuilder();
Document doc = parser.parse(new InputSource(new StringReader(xml)));
// Grabs Security Index Number from header xml
NodeList securityList = doc.getElementsByTagName("Security_Tbl_Indx");
for(int i=0; i< securityList.getLength();i++) {
Node security = securityList.item(i);
if(security.getNodeType()==Node.ELEMENT_NODE) {
Element securityNumber = (Element) security;
securityTableIndex = securityNumber.getTextContent();
}}
}catch(IOException | SAXException | ParserConfigurationException | RuntimeException e){
validMessage = false;
getLogger().debug("Invalid XML. Dropping message.");
} // end of try-catch
return validMessage;
}
private void process(ProcessSession session, String message, boolean validMessage){
// create a new flowfile and send it to the next processor
FlowFile flowFile = session.create();
// add index number grabbed from xml header as attribute to flowfile
flowFile = session.putAttribute(flowFile, "securityTableIndex", securityTableIndex);
flowFile = session.write(flowFile, new OutputStreamCallback(){
@Override
public void process(OutputStream out) throws IOException{
if(getLogger().isDebugEnabled()){
getLogger().debug("Writing CMF message to the FlowFile.");
}
out.write(message.getBytes("UTF-8"));
}
});
if(validMessage){
if(getLogger().isDebugEnabled()){
getLogger().debug("Transferring FlowFile to the SUCCESS relationship.");
}
session.transfer(flowFile, REL_SUCCESS);
}else{
if(getLogger().isDebugEnabled()){
getLogger().debug("Transferring FlowFile to the FAILURE relationship.");
}
session.transfer(flowFile, REL_FAILURE);
}
session.commit();
}
}
А вот код теста:
public class ProcessorTest{
final Logger LOGGER = LoggerFactory.getLogger(ProcessorTest.class);
private static final String MESSAGE_TAG = "Message";
private static final String PROLOGUE = "<?xml version=\"1.0\" encoding=\"utf-8\" ?>";
private static final String VALID_MESSAGE = "<Message><Test>1</Test><Name>name</Name><Security_Tbl_Indx>2</Security_Tbl_Indx></Message>";
private static final String START_TAG_MESSAGE = "<Message><Test>1</Test><Name>name</Name><Security_Tbl_Indx>2</Security_Tbl_Indx>";
private static final String END_TAG_MESSAGE = "<Test>1</Test><Name>name</Name><Security_Tbl_Indx>2</Security_Tbl_Indx></Message>";
@Test
public void testOnTriggerSuccess() throws InterruptedException, IOException{
MockServer server = new MockServer(PROLOGUE + VALID_MESSAGE, PROLOGUE + VALID_MESSAGE,
PROLOGUE + VALID_MESSAGE);
final TestRunner runner = TestRunners.newTestRunner(new Processor());
runner.setProperty(Processor.TCP_HOST, "localhost");
runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
runner.setProperty(Processor.MESSAGE_TAG, MESSAGE_TAG);
server.start();
runner.run(1);
runner.assertTransferCount(Processor.REL_SUCCESS, 3);
runner.assertTransferCount(Processor.REL_FAILURE, 0);
MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(0);
String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(VALID_MESSAGE));
mockFlowFile.assertAttributeExists("securityTableIndex");
mockFlowFile.assertAttributeEquals("securityTableIndex", "2");
mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(1);
content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(VALID_MESSAGE));
mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(2);
content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(VALID_MESSAGE));
server.join();
assertThat(server.getSentMessages(), is(3));
}
@Test
public void testOnTriggerSuccessManyMessages() throws InterruptedException, IOException{
String message = "<a>%d</a>";
String[] messages = new String[100];
for(int i = 0; i < messages.length; i++){
messages[i] = String.format(message, i);
}
MockServer server = new MockServer(messages);
final TestRunner runner = TestRunners.newTestRunner(new Processor());
runner.setProperty(Processor.TCP_HOST, "localhost");
runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
runner.setProperty(Processor.MESSAGE_TAG, "a");
server.start();
runner.run(1);
server.join();
assertThat(server.getSentMessages(), is(messages.length));
runner.assertTransferCount(Processor.REL_SUCCESS, messages.length);
runner.assertTransferCount(Processor.REL_FAILURE, 0);
for(int i = 0; i < messages.length; i++){
MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(i);
String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(messages[i]));
}
}
@Test
public void testOnTriggerSuccessLargeMessage() throws InterruptedException, IOException{
StringBuilder message = new StringBuilder("<" + MESSAGE_TAG + ">");
for(int i = 0; i < 10000; i++){
message.append(String.format("<Test%d>%d</Test%d><Name%d>name%d</Name%d>", i, i, i, i, i, i));
}
message.append("</" + MESSAGE_TAG + ">");
MockServer server = new MockServer(message.toString());
final TestRunner runner = TestRunners.newTestRunner(new Processor());
runner.setProperty(Processor.TCP_HOST, "localhost");
runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
runner.setProperty(Processor.MESSAGE_TAG, MESSAGE_TAG);
server.start();
runner.run(1);
server.join();
assertThat(server.getSentMessages(), is(1));
runner.assertTransferCount(Processor.REL_SUCCESS, 1);
runner.assertTransferCount(Processor.REL_FAILURE, 0);
MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(0);
String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(message.toString()));
}
@Test
public void testOnTriggerEmpty() throws InterruptedException, IOException{
MockServer server = new MockServer(VALID_MESSAGE);
final TestRunner runner = TestRunners.newTestRunner(new Processor());
runner.setProperty(Processor.TCP_HOST, "localhost");
runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
runner.setProperty(Processor.MESSAGE_TAG, MESSAGE_TAG.toLowerCase());
server.start();
runner.run(1);
server.join();
assertThat(server.getSentMessages(), is(1));
runner.assertTransferCount(Processor.REL_SUCCESS, 0);
runner.assertTransferCount(Processor.REL_FAILURE, 0);
}
@Test
public void testOnTriggerStartTag() throws InterruptedException, IOException{
MockServer server = new MockServer(START_TAG_MESSAGE, START_TAG_MESSAGE, VALID_MESSAGE, START_TAG_MESSAGE,
VALID_MESSAGE, START_TAG_MESSAGE);
final TestRunner runner = TestRunners.newTestRunner(new Processor());
runner.setProperty(Processor.TCP_HOST, "localhost");
runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
runner.setProperty(Processor.MESSAGE_TAG, "Message");
server.start();
runner.run(1);
server.join();
assertThat(server.getSentMessages(), is(6));
runner.assertTransferCount(Processor.REL_SUCCESS, 2);
runner.assertTransferCount(Processor.REL_FAILURE, 3);
MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_FAILURE).get(0);
String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(START_TAG_MESSAGE));
mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_FAILURE).get(1);
content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(START_TAG_MESSAGE));
mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_FAILURE).get(2);
content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(START_TAG_MESSAGE));
mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(0);
content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(VALID_MESSAGE));
mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(1);
content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(VALID_MESSAGE));
}
@Test
public void testOnTriggerEndTag() throws InterruptedException, IOException{
MockServer server = new MockServer(END_TAG_MESSAGE, VALID_MESSAGE, VALID_MESSAGE, END_TAG_MESSAGE,
VALID_MESSAGE);
final TestRunner runner = TestRunners.newTestRunner(new Processor());
runner.setProperty(Processor.TCP_HOST, "localhost");
runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
runner.setProperty(Processor.MESSAGE_TAG, "Message");
server.start();
runner.run(1);
server.join();
assertThat(server.getSentMessages(), is(5));
runner.assertTransferCount(Processor.REL_SUCCESS, 3);
runner.assertTransferCount(Processor.REL_FAILURE, 2);
MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_FAILURE).get(0);
String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(END_TAG_MESSAGE));
mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_FAILURE).get(1);
content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(END_TAG_MESSAGE));
mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(0);
content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(VALID_MESSAGE));
mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(1);
content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(VALID_MESSAGE));
mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(2);
content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(VALID_MESSAGE));
}
@Test
public void testOnTriggerBadMessage() throws InterruptedException, IOException{
MockServer server = new MockServer(END_TAG_MESSAGE, START_TAG_MESSAGE, VALID_MESSAGE);
final TestRunner runner = TestRunners.newTestRunner(new Processor());
runner.setProperty(Processor.TCP_HOST, "localhost");
runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
runner.setProperty(Processor.MESSAGE_TAG, "Message");
server.start();
runner.run(1);
server.join();
assertThat(server.getSentMessages(), is(3));
runner.assertTransferCount(Processor.REL_SUCCESS, 1);
runner.assertTransferCount(Processor.REL_FAILURE, 2);
MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_FAILURE).get(0);
String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(END_TAG_MESSAGE));
mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_FAILURE).get(1);
content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(START_TAG_MESSAGE));
mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(0);
content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(VALID_MESSAGE));
}
/**
* Remove annotation since this method can fail if another server grabs the
* port while this server is closed.
*/
public void testOnTriggerSuccessDisappearingServer() throws InterruptedException, IOException{
DisappearingMockServer server = new DisappearingMockServer(PROLOGUE + VALID_MESSAGE, PROLOGUE + VALID_MESSAGE,
PROLOGUE + VALID_MESSAGE);
final TestRunner runner = TestRunners.newTestRunner(new Processor());
runner.setProperty(Processor.TCP_HOST, "localhost");
runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
runner.setProperty(Processor.MESSAGE_TAG, MESSAGE_TAG);
server.start();
int count = 0;
while(++count < 10 && server.getSentMessages() < 3){
runner.run();
}
runner.assertTransferCount(Processor.REL_SUCCESS, 3);
runner.assertTransferCount(Processor.REL_FAILURE, 0);
MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(0);
String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(VALID_MESSAGE));
mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(1);
content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(VALID_MESSAGE));
mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(2);
content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(VALID_MESSAGE));
server.join();
assertThat(server.getSentMessages(), is(3));
}
@Test
public void testOnTriggerSuccessDisappearingServerReopen() throws InterruptedException, IOException{
DisappearingMockServer2 server = new DisappearingMockServer2(PROLOGUE + VALID_MESSAGE, PROLOGUE + VALID_MESSAGE,
PROLOGUE + VALID_MESSAGE);
int port = server.getPort();
final TestRunner runner = TestRunners.newTestRunner(new Processor());
runner.setProperty(Processor.TCP_HOST, "localhost");
runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
runner.setProperty(Processor.MESSAGE_TAG, MESSAGE_TAG);
server.start();
runner.run(2);
runner.assertTransferCount(Processor.REL_SUCCESS, 3);
runner.assertTransferCount(Processor.REL_FAILURE, 0);
MockServer server2 = new MockServer(port, PROLOGUE + VALID_MESSAGE, PROLOGUE + VALID_MESSAGE,
PROLOGUE + VALID_MESSAGE);
server2.start();
runner.run(2);
runner.assertTransferCount(Processor.REL_SUCCESS, 6);
runner.assertTransferCount(Processor.REL_FAILURE, 0);
for(int i = 0; i < 6; i++){
MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(i);
String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(VALID_MESSAGE));
}
server.join();
server2.join();
assertThat(server.getSentMessages(), is(3));
assertThat(server2.getSentMessages(), is(3));
}
@Test
public void testOnTriggerSuccessPausingServerReopen() throws InterruptedException, IOException{
int expiration = 1;
PausingMockServer server = new PausingMockServer(expiration * 1000 + 500, PROLOGUE + VALID_MESSAGE,
PROLOGUE + VALID_MESSAGE, PROLOGUE + VALID_MESSAGE);
final TestRunner runner = TestRunners.newTestRunner(new Processor());
runner.setProperty(Processor.TCP_HOST, "localhost");
runner.setProperty(Processor.TCP_PORT, Integer.toString(server.getPort()));
runner.setProperty(Processor.MESSAGE_TAG, MESSAGE_TAG);
runner.setProperty(Processor.CONNECTION_EXPIRATION, "" + expiration + " seconds");
server.start();
runner.run(2);
runner.assertTransferCount(Processor.REL_SUCCESS, 6);
runner.assertTransferCount(Processor.REL_FAILURE, 0);
for(int i = 0; i < 6; i++){
MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(Processor.REL_SUCCESS).get(i);
String content = new String(runner.getContentAsByteArray(mockFlowFile), "UTF-8");
assertThat(content, is(VALID_MESSAGE));
}
server.join();
runner.shutdown();
assertThat(server.getSentMessages(), is(6));
}
private class MockServer extends Thread{
protected String[] messages;
protected int sentMessages = 0;
protected final ServerSocket server;
public MockServer(String... messages) throws IOException{
this(0, messages);
}
public MockServer(int port, String... messages) throws IOException{
this.messages = messages;
server = new ServerSocket(port);
}
@Override
public void run(){
try(final Socket clientSocket = server.accept()){
LOGGER.debug("Server start sending.");
BufferedOutputStream output = new BufferedOutputStream(clientSocket.getOutputStream());
for(String message : messages){
sentMessages++;
output.write(message.getBytes());
output.flush();
}
clientSocket.shutdownOutput();
}catch(IOException e){
LOGGER.error("Error sending messages from Mock Server", e);
}finally{
try{
server.close();
}catch(IOException e){
LOGGER.error("Error closing socket", e);
}
}
}
public int getSentMessages(){
return sentMessages;
}
public int getPort(){
return server.getLocalPort();
}
}
private class DisappearingMockServer extends Thread{
protected Queue<String> messages;
protected int sentMessages = 0;
protected ServerSocket server;
protected final int port;
public DisappearingMockServer(String... messages) throws IOException{
this.messages = new LinkedList<>();
Arrays.stream(messages).forEach(this.messages::add);
server = new ServerSocket(0);
port = server.getLocalPort();
server.close();
}
@Override
public void run(){
int attempts = messages.size() * 2;
while(!messages.isEmpty() & --attempts >= 0){
try{
server = new ServerSocket(port);
try(final Socket clientSocket = server.accept()){
LOGGER.debug("Server start sending.");
BufferedOutputStream output = new BufferedOutputStream(clientSocket.getOutputStream());
sentMessages++;
LOGGER.debug("Sending message: " + sentMessages);
output.write(messages.poll().getBytes());
output.flush();
clientSocket.shutdownOutput();
}catch(IOException e){
LOGGER.error("Error sending messages from Mock Server", e);
}finally{
try{
server.close();
}catch(IOException e){
LOGGER.error("Error closing socket", e);
}
}
Thread.sleep(1000);
}catch(IOException | InterruptedException e){
LOGGER.error("Error occurred.", e);
}
}
LOGGER.debug("Finished sending messages.");
}
public int getSentMessages(){
return sentMessages;
}
public int getPort(){
return port;
}
}
private class DisappearingMockServer2 extends MockServer{
public DisappearingMockServer2(String... messages) throws IOException{
super(messages);
}
@Override
public void run(){
try(final Socket clientSocket = server.accept()){
LOGGER.debug("Server start sending.");
BufferedOutputStream output = new BufferedOutputStream(clientSocket.getOutputStream());
for(String message : messages){
sentMessages++;
output.write(message.getBytes());
output.flush();
}
throw new IllegalArgumentException("ERROR");
}catch(IOException e){
LOGGER.error("Error sending messages from Mock Server", e);
}finally{
try{
server.close();
}catch(IOException e){
LOGGER.error("Error closing socket", e);
}
}
}
}
private class PausingMockServer extends Thread{
protected String[] messages;
protected int sentMessages = 0;
protected long pauseMillis;
protected final ServerSocket server;
public PausingMockServer(long pauseMillis, String... messages) throws IOException{
this(pauseMillis, 0, messages);
}
public PausingMockServer(long pauseMillis, int port, String... messages) throws IOException{
this.messages = messages;
this.pauseMillis = pauseMillis;
server = new ServerSocket(port);
}
@Override
public void run(){
try(final Socket clientSocket = server.accept()){
LOGGER.debug("Server start sending.");
BufferedOutputStream output = new BufferedOutputStream(clientSocket.getOutputStream());
for(String message : messages){
sentMessages++;
output.write(message.getBytes());
output.flush();
}
Thread.sleep(pauseMillis);
clientSocket.shutdownOutput();
}catch(IOException e){
LOGGER.error("Error sending messages from Pausing Mock Server", e);
}catch(InterruptedException e){
LOGGER.error("Interrupted Error sending messages from Pausing Mock Server", e);
}
try(final Socket clientSocket = server.accept()){
LOGGER.debug("Server start sending.");
BufferedOutputStream output = new BufferedOutputStream(clientSocket.getOutputStream());
for(String message : messages){
sentMessages++;
output.write(message.getBytes());
output.flush();
}
clientSocket.shutdownOutput();
}catch(IOException e){
LOGGER.error("Error sending messages from Pausing Mock Server", e);
}finally{
try{
server.close();
}catch(IOException e){
LOGGER.error("Error closing server socket", e);
}
}
}
public int getSentMessages(){
return sentMessages;
}
public int getPort(){
return server.getLocalPort();
}
}
}