Я получаю события от Кафки, обогащаю / фильтрую / трансформирую их в Spark и затем сохраняю их в ES.Я возвращаю смещения в Kafka
У меня есть два вопроса / проблемы:
(1) Моя текущая работа в Spark ОЧЕНЬ медленная
I50 разделов по теме и 20 исполнителей.Каждый исполнитель имеет 2 ядра и 4 г памяти каждый.У моего драйвера 8г памяти.Я потребляю 1000 событий / раздел / секунду, и мой пакетный интервал составляет 10 секунд.Это означает, что я потребляю 500000 событий за 10 секунд
Мой кластер ES выглядит следующим образом:
20 осколков / индекс
3 основных экземпляра c5.xlarge.elasticsearch
12 экземпляров m4.xlarge.elasticsearch
диск / узел = 1024 ГБ, поэтому всего 12 ТБ
И у меня огромные задержки при планировании и обработке
(2) Как я могу зафиксировать смещения на исполнителях?
В настоящее время я обогащаю / преобразовываю / фильтрую свои события на исполнителях, а затем отправляю все в ES, используя BulkRequest .Это синхронный процесс.Если я получаю положительный отзыв, я отправляю список смещений водителю.Если нет, я отправляю обратно пустой список.На драйвере я фиксирую смещения в Кафке.Я считаю, что должен быть способ, при котором я могу фиксировать смещения на исполнителях, но я не знаю, как передать kafka Stream исполнителям:
((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges, this::onComplete);
Это код для передачи смещений в Kafka, который требуетKafka Stream
Вот мой общий код:
kafkaStream.foreachRDD( // kafka topic
rdd -> { // runs on driver
rdd.cache();
String batchIdentifier =
Long.toHexString(Double.doubleToLongBits(Math.random()));
LOGGER.info("@@ [" + batchIdentifier + "] Starting batch ...");
Instant batchStart = Instant.now();
List<OffsetRange> offsetsToCommit =
rdd.mapPartitionsWithIndex( // kafka partition
(index, eventsIterator) -> { // runs on worker
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
LOGGER.info(
"@@ Consuming " + offsetRanges[index].count() + " events" + " partition: " + index
);
if (!eventsIterator.hasNext()) {
return Collections.emptyIterator();
}
// get single ES documents
List<SingleEventBaseDocument> eventList = getSingleEventBaseDocuments(eventsIterator);
// build request wrappers
List<InsertRequestWrapper> requestWrapperList = getRequestsToInsert(eventList, offsetRanges[index]);
LOGGER.info(
"@@ Processed " + offsetRanges[index].count() + " events" + " partition: " + index + " list size: " + eventList.size()
);
BulkResponse bulkItemResponses = elasticSearchRepository.addElasticSearchDocumentsSync(requestWrapperList);
if (!bulkItemResponses.hasFailures()) {
return Arrays.asList(offsetRanges).iterator();
}
elasticSearchRepository.close();
return Collections.emptyIterator();
},
true
).collect();
LOGGER.info(
"@@ [" + batchIdentifier + "] Collected all offsets in " + (Instant.now().toEpochMilli() - batchStart.toEpochMilli()) + "ms"
);
OffsetRange[] offsets = new OffsetRange[offsetsToCommit.size()];
for (int i = 0; i < offsets.length ; i++) {
offsets[i] = offsetsToCommit.get(i);
}
try {
offsetManagementMapper.commit(offsets);
} catch (Exception e) {
// ignore
}
LOGGER.info(
"@@ [" + batchIdentifier + "] Finished batch of " + offsetsToCommit.size() + " messages " +
"in " + (Instant.now().toEpochMilli() - batchStart.toEpochMilli()) + "ms"
);
rdd.unpersist();
});