У меня есть приложение, которое получает большое количество запросов GET (около 250000 за 5 минут).Приложение анализирует параметры запроса и публикует их в Kafka.Код для публикации выглядит следующим образом:
public class KafkaProcessor {
private static final String BATCH_SIZE = "batch.size";
private static final String REQUEST_REQUIRED_ACKS = "request.required.acks";
private static final String PRODUCER_TYPE = "producer.type";
private static final String VALUE_SERIALIZER = "value.serializer";
private static final String KEY_SERIALIZER = "key.serializer";
private static final String METADATA_BROKER_LIST = "bootstrap.servers";
private static final String MAX_BLOCK_MS = "max.block.ms";
private static final String KAFKA_ENABLED = "enabled";
private static Properties props = new Properties();
private static KafkaProducer<String, String> producer;
private static ProducerRecord<String, String> producerRecord;
private static String topic;
static {
boolean isEnabled = Boolean.parseBoolean(ResourceProps.INSTANCE.getKafkaProps(KAFKA_ENABLED));
if (isEnabled) {
//Setting up a producer configuration.
props.put(METADATA_BROKER_LIST, "x.x.x.x:9092,y.y.y.y:9092");
props.put(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");
props.put(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");
props.put(PRODUCER_TYPE, "async");
props.put(REQUEST_REQUIRED_ACKS, "1");
props.put(BATCH_SIZE, "1000");
props.put(MAX_BLOCK_MS, "10000");
producer = new KafkaProducer<>(props);
topic = "pixel-server";
}
}
private static void publishToKafka(JSONObject data) {
producerRecord = new ProducerRecord<String, String>(topic, data.toString());
producer.send(producerRecord, new Callback() {
@Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
}
}
});
}
}
Мое приложение размещено в экземпляре AWS.Сервер Kafka также находится на другом компьютере AWS.
Однако, если kafka не работает или kafka медленно реагирует по какой-либо причине, мое приложение зависает и не может обрабатывать какой-либо запрос дальше.Я хочу знать, как я могу сделать свое приложение независимым от Кафки, то есть, если кафка выходит из строя (или медленно реагирует), это не должно влиять на мое приложение.
Я пробовал несколько способов, например, еслиkafka выдает тайм-аут, затем подсчитывает число исключений тайм-аута и прекращает публикацию в kafka, но, поскольку объем запроса очень велик, поэтому к тому времени я получаю любое исключение тайм-аута, мое приложение зависает.
Любая помощь или указательбудет оценено.
Я использую Kafka 0.8.2.Мой сервер в Vertx.ОС используется в Ubuntu.Улимит установлены на макс.