Почему наличие сериализатора в локальном потоке приводит к утечке памяти у производителя кафки? - PullRequest
0 голосов
/ 09 февраля 2019

Рассмотрим следующую настройку

prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ThriftSerializer.class.getName());


public class ThriftSerializer implements Serializer<TBase> {
    private final ThreadLocal<TSerializer> serializer = new ThreadLocalTSerializer();

    @Override
    public void configure(Map map, boolean b) {

    }

    @Override
    public byte[] serialize(String s, TBase event) {
        try {
            return serializer.get().serialize(event);
        } catch (TException e) {
            return new byte[0];
        }
    }

    @Override
    public void close() {

    }
}

Приведенный выше код вызывает утечку памяти

Но я не понимаю, почему это произошло.Создает ли производитель kafka множество потоков, которые не умирают?

Если приведенный выше код заменить на

@Override
public byte[] serialize(String s, TBase event) {
    TSerializer serializer = new TSerializer();

    try {
        return serializer.serialize(event);
    } catch (TException e) {
        return new byte[0];
    }
}

Тогда утечка памяти исчезнет, ​​что имеет смысл, но для каждого события еесоздание нового объекта, который нужно собирать мусором, потенциально вызывающий давление gc, если пропускная способность высока

Может ли кто-нибудь указать мне направление в понимании этого поведения?

1 Ответ

0 голосов
/ 10 февраля 2019

Насколько мне известно, KafkaProducer является поточно-ориентированным, и совместное использование одного экземпляра производителя между потоками, как правило, будет быстрее, чем наличие нескольких экземпляров.

Но метод send является асинхронным (если вы не вызываете .get () в объекте Future, возвращаемом методом send, что не рекомендуется, таким образом вы будете ожидать каждую отправку и, следовательно, обрабатывать их синхронно).

Из-за документации производитель состоит из пулабуферного пространства, в котором хранятся записи, которые еще не были переданы на сервер, а также фоновый поток ввода-вывода, который отвечает за превращение этих записей в запросы и передачу их в кластер .Невозможность закрыть производителя после использования приведет к утечке этих ресурсов.

Похоже, что метод send фактически использует фоновый поток для преобразования вашей записи и отправки ее в кластер.

Вы действительно закрываетепроизводитель в конце?

    producer.flush();
    producer.close();

Метод close сериализатора вызывается, когда сеанс Kafka должен быть закрыт.Я предполагаю, что вы можете попытаться выполнить дополнительную очистку в методе close сериализатора или отметить его как пригодный для сбора мусора.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...