API процессора: массовый запрос POST для событий, хранящихся в KeyValueStore - PullRequest
1 голос
/ 01 апреля 2020

Как предложено здесь { ссылка } Я использовал Processor API для хранения входящих запросов в KeyValueStore. Каждые 100 событий я хочу отправить POST Запрос. Поэтому я сделал это:

public class BulkProcessor implements Processor<byte[], UserEvent> {

    private KeyValueStore<Integer, ArrayList<UserEvent>> keyValueStore;

    private BulkAPIClient bulkClient;

    private String storeName;

    private ProcessorContext context;

    private int count;

    @Autowired
    public BulkProcessor(String storeName, BulkClient bulkClient) {
        this.storeName = storeName;
        this.bulkClient = bulkClient;
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        keyValueStore = (KeyValueStore<Integer, ArrayList<UserEvent>>) context.getStateStore(storeName);
        count = 0;
        // to check every 15 minutes if there are any remainders in the store that are not sent yet
        this.context.schedule(Duration.ofMinutes(15), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
            if (count > 0) {
                sendEntriesFromStore();
            }
        });
    }

    @Override
    public void process(byte[] key, UserEvent value) {
        int userGroupId = Integer.valueOf(value.getUserGroupId());
        ArrayList<UserEvent> userEventArrayList = keyValueStore.get(userGroupId);
        if (userEventArrayList == null) {
            userEventArrayList = new ArrayList<>();
        }
        userEventArrayList.add(value);
        keyValueStore.put(userGroupId, userEventArrayList);
        if (count == 100) {
            sendEntriesFromStore();
        }
    }

    private void sendEntriesFromStore() {
        KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
        while (iterator.hasNext()) {
            KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
            keyValueStore.delete(entry.key);
            BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
            if (bulkRequest.getLocation() != null) {
                URI url = bulkClient.buildURIPath(bulkRequest);
                bulkClient.postRequestBulkApi(url, bulkRequest);
            }
        }
        iterator.close();
        count = 0;
    }

    @Override
    public void close() {
    }
}

Я не уверен, является ли добавление count поточно-ориентированным или это правильный способ его реализации. В настоящее время я также читаю только с одного раздела. Итак, мои вопросы:

  • Является ли этот потокобезопасным?
  • Является ли это хорошим способом отправки массовых запросов POST в Processor API?

1 Ответ

2 голосов
/ 02 апреля 2020

Этот потокобезопасен?

Да, он безопасен для потоков. Каждый поток использует ProcessorSupplier для создания своего собственного Processor экземпляра / объекта.

Является ли это хорошим способом отправки массовых запросов POST в Processor API?

В целом выглядит хорошо для меня.

...