После прочтения документации Redis о том, как работают потоки, я пришел к следующему, чтобы автоматически обрабатывать любые неподтвержденные, но ранее доставленные сообщения для потребителя:
// Check for any previously unacknowledged messages that were delivered to this consumer.
log.info("STREAM - Checking for previously unacknowledged messages for " + this.getClass().getSimpleName() + " event stream listener.");
String offset = "0";
while ((offset = processUnacknowledgedMessage(offset)) != null) {
log.info("STREAM - Finished processing one unacknowledged message for " + this.getClass().getSimpleName() + " event stream listener: " + offset);
}
log.info("STREAM - Finished checking for previously unacknowledged messages for " + this.getClass().getSimpleName() + " event stream listener.");
И метод, который обрабатывает сообщения:
/**
* Processes, and acknowledges the next previously delivered message, beginning
* at the given message id offset.
*
* @param offset The last read message id offset.
* @return The message that was just processed, or null if there are no more messages.
*/
public String processUnacknowledgedMessage(String offset) {
List<MapRecord> messages = streamRedisTemplate.opsForStream().read(Consumer.from(groupName(), consumerName()),
StreamReadOptions.empty().noack().count(1),
StreamOffset.create(streamKey(), ReadOffset.from(offset)));
String lastMessageId = null;
for (MapRecord message : messages) {
if (log.isDebugEnabled()) log.debug(String.format("STREAM - Processing event(%s) from stream(%s) during startup: %s", message.getId(), message.getStream(), message.getValue()));
processRecord(message);
if (log.isDebugEnabled()) log.debug(String.format("STREAM - Finished processing event(%s) from stream(%s) during startup.", message.getId(), message.getStream()));
streamRedisTemplate.opsForStream().acknowledge(groupName(), message);
lastMessageId = message.getId().getValue();
}
return lastMessageId;
}