Spring cloud stream kafka - у подписанного канала нет выхода - PullRequest
0 голосов
/ 02 ноября 2018

У меня есть приложение, которое выполняет большую часть обработки данных (порядка ~ 1,3 миллиона одновременно), которая происходит пакетами. Приложение использует данные из темы кафки.

Я использую версию 2.0.1 spring-cloud-stream-starter-kafka для потребления данных.

Мой код выглядит следующим образом:

Слушатель:

@Service
public class ListenerService {

    @Autowired
    private Application2<Foo> application;

    @Override
    @StreamListener(FooStreams.INPUT)
    public void subscribe(@Payload Foo foo) {
        application.sync(foo);
    }
}

Streams:

public interface FooStreams {

    String INPUT = "Foo";

    @Input(value = INPUT)
    SubscribableChannel subscribe();

}

В основном приложении я привязал поток к kafka следующим образом:

@SpringBootApplication
@EnableBinding({FooStreams.class})
public class Application {

    private static final Logger logger = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        try {
            SpringApplication.run(Application.class, args);
        }
        catch (Exception e) {
            logger.error("Application failed to start");
        }

    }
}

Есть что-то, чего мне не хватает? Проблема в том, что я вижу, что использование памяти увеличивается во время обработки данных, которая не снижается после завершения обработки.

...