В моем процессоре API я храню сообщения в хранилище значений ключей, и каждые 100 сообщений я делаю запрос POST
. Если что-то не получается при попытке отправить сообщение (API не отвечает и т. Д. c.) Я хочу прекратить обработку сообщений. Пока нет доказательств того, что вызовы API работают. Вот мой код:
public class BulkProcessor implements Processor<byte[], UserEvent> {
private KeyValueStore<Integer, ArrayList<UserEvent>> keyValueStore;
private BulkAPIClient bulkClient;
private String storeName;
private ProcessorContext context;
private int count;
@Autowired
public BulkProcessor(String storeName, BulkClient bulkClient) {
this.storeName = storeName;
this.bulkClient = bulkClient;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
keyValueStore = (KeyValueStore<Integer, ArrayList<UserEvent>>) context.getStateStore(storeName);
count = 0;
// to check every 15 minutes if there are any remainders in the store that are not sent yet
this.context.schedule(Duration.ofMinutes(15), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
if (count > 0) {
sendEntriesFromStore();
}
});
}
@Override
public void process(byte[] key, UserEvent value) {
int userGroupId = Integer.valueOf(value.getUserGroupId());
ArrayList<UserEvent> userEventArrayList = keyValueStore.get(userGroupId);
if (userEventArrayList == null) {
userEventArrayList = new ArrayList<>();
}
userEventArrayList.add(value);
keyValueStore.put(userGroupId, userEventArrayList);
if (count == 100) {
sendEntriesFromStore();
}
}
private void sendEntriesFromStore() {
KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
while (iterator.hasNext()) {
KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
if (bulkRequest.getLocation() != null) {
URI url = bulkClient.buildURIPath(bulkRequest);
try {
bulkClient.postRequestBulkApi(url, bulkRequest);
keyValueStore.delete(entry.key);
} catch (BulkApiException e) {
logger.warn(e.getMessage(), e.fillInStackTrace());
}
}
}
iterator.close();
count = 0;
}
@Override
public void close() {
}
}
В настоящее время в моем коде, если вызов API завершается неудачно, он будет повторять следующие 100 (и это будет происходить до тех пор, пока он не будет работать) и добавлять их в keyValueStore
. Я не хочу, чтобы это случилось. Вместо этого я бы предпочел остановить поток и продолжить после очистки keyValueStore
. Возможно ли это?
Могу ли я выбросить StreamsException
?
try {
bulkClient.postRequestBulkApi(url, bulkRequest);
keyValueStore.delete(entry.key);
} catch (BulkApiException e) {
throw new StreamsException(e);
}
Это убило бы мое потоковое приложение, и процесс умирает?