Мне нужно реализовать службу Non-Blocking Producer / Consumer для потоковой обработки, которая предоставляет HTTP API. Мне нужно использовать исполняемый файл черного ящика, который выплевывает бесконечный поток строк данных о событиях, закодированных в JSON:
Я создал Spring Boot приложение, оно компилируется и запускается, но ничего не происходит: файл EXE работает, но приложение ничего не делает, оно продолжает слушать и ничего больше , Что я делаю не так?
Я запускаю свое приложение как Java-приложение в IDEA IntelliJ.
Кроме того, я думаю, что мое приложение может быть лучше, например:
1. Реализация разделения чтения / записи правильно.
2. Приращение типа MutableInt не является потокобезопасным, я хочу другой способ сделать то же самое без него.
3. Печать результатов (сериализация) не была стандартной
4. Обработка ошибок, как правило, включает в себя синтаксический анализ ввода
5. Обычно именование методов не всегда отражает функциональность.
Я новичок в этом потоковом приложении и буду признателен за любые советы, подсказки и помощника.
Я думал об использовании RxJava для своего приложения, будет ли это лучше или лучше использовать RabbitMQ?
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.beniregev</groupId>
<artifactId>bigpanda-home-exercise</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>bigpanda-home-exercise</name>
<description>Big Panda home exercise</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
ресурсы / application.properties
stream.generator.exe.path=D:\\JavaProjects\\IdeaProjects\\bigpanda-home-exercise\\generator-windows-amd64.exe
StreamEvent.java
package com.beniregev.bigpandahomeexercise.models;
public class StreamEvent {
private String type;
private String data;
private String timestamp;
/**
*
* @return {@code Event-Type} as {@link String}.
*/
public String getType() {
return type;
}
/**
*
* @param type The event-type to set
*/
public void setType(String type) {
this.type = type;
}
/**
*
* @return {@code data} property of the Event
*/
public String getData() {
return data;
}
/**
*
* @param dataThe value to set {@code data} property
*/
public void setData(String data) {
this.data = data;
}
/**
*
* @return {@code timestamp} property of the Event
*/
public String getTimestamp() {
return timestamp;
}
/**
*
* @param timestampthe timestamp value to set
*/
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
}
CountersService.java
package com.beniregev.bigpandahomeexercise.services;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
public class CountersService {
private Map<String, MutableInt> words = new HashMap<String, MutableInt>();
private Map<String, MutableInt> types = new HashMap<String, MutableInt>();
public void countWords(String word) {
MutableInt count = words.get(word);
if (count == null) {
words.put(word, new MutableInt());
}
else {
count.increment();
}
}
public void countTypes(String type) {
MutableInt count = types.get(type);
if (count == null) {
types.put(type, new MutableInt());
}
else {
count.increment();
}
}
private String printMap(Map<String, MutableInt> counters) {
StringBuffer strb = new StringBuffer();
counters.entrySet().forEach(entry -> {strb.append(entry.getKey() + " -> " + entry.getValue().get() +", ");});
return strb.toString().substring(0, strb.length()-2);
}
public String printEventsTypesCounter() {
return printMap(types);
}
public String printWordsCounter() {
return printMap(words);
}
class MutableInt {
int value = 1;
public void increment () { ++value; }
public int get () { return value; }
}
}
StreamProcessService.java
package com.beniregev.bigpandahomeexercise.services;
import com.beniregev.bigpandahomeexercise.models.StreamEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
@Component
public class StreamProcessService {
@Value("${stream.generator.exe.path}")
String path;
@Autowired
CountersService countService;
final ObjectMapper jsonMapper = new ObjectMapper();
@PostConstruct
public void init() {
try {
Process process = Runtime.getRuntime()
.exec(path);
EventStream eventStream =
new EventStream(process.getInputStream(), streamEvent -> {
if(streamEvent != null) {
countService.countTypes(streamEvent.getType());
countService.countWords(streamEvent.getData());
}
});
Executors.newSingleThreadExecutor().submit(eventStream);
} catch(Exception e) {
e.printStackTrace();
}
}
private class EventStream implements Runnable {
private InputStream inputStream;
private Consumer<StreamEvent> consumer;
public EventStream(InputStream inputStream, Consumer<StreamEvent> consumer) {
this.inputStream = inputStream;
this.consumer = consumer;
}
@Override
public void run() {
new BufferedReader(new InputStreamReader(inputStream)).lines()
.map(s -> {
StreamEvent streamEvent = null;
try {
streamEvent = jsonMapper.readValue(s, StreamEvent.class);
}catch(Exception e) {
//bad JSON String. Skip it
}
return streamEvent;
}).forEach( consumer);
}
}
}
DisplayOutputController.java
package com.beniregev.bigpandahomeexercise.controllers;
import com.beniregev.bigpandahomeexercise.services.CountersService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController("DisplayOutputController")
public class DisplayOutputController {
@Autowired
CountersService countService;
@GetMapping("/test")
public String test() {
return "<body><p><b>Test</body>";
}
@GetMapping("/countEventsTypes")
public String countTypes(){
return countService.printEventsTypesCounter();
}
@GetMapping("/countWords")
public String countWords(){
return countService.printWordsCounter();
}
}
BigpandaHomeExerciseApplication.java
package com.beniregev.bigpandahomeexercise;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@EnableAutoConfiguration
@ComponentScan({"com.beniregev.bigpandahomeexercise"})
public class BigpandaHomeExerciseApplication {
public static void main(String[] args) {
SpringApplication.run(BigpandaHomeExerciseApplication.class, args);
}
}