У меня есть приложение, которое выполняет большую часть обработки данных (порядка ~ 1,3 миллиона одновременно), которая происходит пакетами. Приложение использует данные из темы кафки.
Я использую версию 2.0.1 spring-cloud-stream-starter-kafka для потребления данных.
Мой код выглядит следующим образом:
Слушатель:
@Service
public class ListenerService {
@Autowired
private Application2<Foo> application;
@Override
@StreamListener(FooStreams.INPUT)
public void subscribe(@Payload Foo foo) {
application.sync(foo);
}
}
Streams:
public interface FooStreams {
String INPUT = "Foo";
@Input(value = INPUT)
SubscribableChannel subscribe();
}
В основном приложении я привязал поток к kafka следующим образом:
@SpringBootApplication
@EnableBinding({FooStreams.class})
public class Application {
private static final Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
try {
SpringApplication.run(Application.class, args);
}
catch (Exception e) {
logger.error("Application failed to start");
}
}
}
Есть что-то, чего мне не хватает? Проблема в том, что я вижу, что использование памяти увеличивается во время обработки данных, которая не снижается после завершения обработки.