Коннектор раковины Kafka Connect S3 со странным поведением кастомного разделителя - PullRequest
0 голосов
/ 09 июля 2020


Я планирую использовать пользовательский разделитель на основе полей и времени для разделения моих данных в s3 следующим образом: / part_ знак равно /part_date=YYYY-MM-dd/part_hour=HH/....parquet.

Мой Partitioner работает нормально, в моем ведре S3 все как и ожидалось.

Проблема связан с производительностью приемника
У меня 400 КБ / с / брокер = ~ 1,2 МБ / с в моем входном topi c, а приемник работает с пиками и фиксирует небольшое количество записей.

Если я использую classi c TimeBasedPartitioner, введите здесь описание изображения

Итак, моя проблема, похоже, в моем пользовательском разделителе. Вот код:

package test;
import ...;

public final class FieldAndTimeBasedPartitioner<T> extends TimeBasedPartitioner<T> {

private static final Logger log = LoggerFactory.getLogger(FieldAndTimeBasedPartitioner.class);
private static final String FIELD_SUFFIX = "part_";
private static final String FIELD_SEP = "=";
private long partitionDurationMs;
private DateTimeFormatter formatter;
private TimestampExtractor timestampExtractor;
private PartitionFieldExtractor partitionFieldExtractor;

protected void init(long partitionDurationMs, String pathFormat, Locale locale, DateTimeZone timeZone, Map<String, Object> config) {

    this.delim = (String)config.get("directory.delim");
    this.partitionDurationMs = partitionDurationMs;

    try {
        this.formatter = getDateTimeFormatter(pathFormat, timeZone).withLocale(locale);
        this.timestampExtractor = this.newTimestampExtractor((String)config.get("timestamp.extractor"));
        this.timestampExtractor.configure(config);
        this.partitionFieldExtractor = new PartitionFieldExtractor((String)config.get("partition.field"));
    } catch (IllegalArgumentException e) {
        ConfigException ce = new ConfigException("path.format", pathFormat, e.getMessage());
        ce.initCause(e);
        throw ce;
    }
}

private static DateTimeFormatter getDateTimeFormatter(String str, DateTimeZone timeZone) {
    return DateTimeFormat.forPattern(str).withZone(timeZone);
}

public static long getPartition(long timeGranularityMs, long timestamp, DateTimeZone timeZone) {
    long adjustedTimestamp = timeZone.convertUTCToLocal(timestamp);
    long partitionedTime = adjustedTimestamp / timeGranularityMs * timeGranularityMs;
    return timeZone.convertLocalToUTC(partitionedTime, false);
}

public String encodePartition(SinkRecord sinkRecord, long nowInMillis) {
    final Long timestamp = this.timestampExtractor.extract(sinkRecord, nowInMillis);
    final String partitionField = this.partitionFieldExtractor.extract(sinkRecord);
    return this.encodedPartitionForFieldAndTime(sinkRecord, timestamp, partitionField);
}

public String encodePartition(SinkRecord sinkRecord) {
    final Long timestamp = this.timestampExtractor.extract(sinkRecord);
    final String partitionFieldValue = this.partitionFieldExtractor.extract(sinkRecord);
    return encodedPartitionForFieldAndTime(sinkRecord, timestamp, partitionFieldValue);
}

private String encodedPartitionForFieldAndTime(SinkRecord sinkRecord, Long timestamp, String partitionField) {

    if (timestamp == null) {
        String msg = "Unable to determine timestamp using timestamp.extractor " + this.timestampExtractor.getClass().getName() + " for record: " + sinkRecord;
        log.error(msg);
        throw new ConnectException(msg);
    } else if (partitionField == null) {
        String msg = "Unable to determine partition field using partition.field '" + partitionField  + "' for record: " + sinkRecord;
        log.error(msg);
        throw new ConnectException(msg);
    }  else {
        DateTime recordTime = new DateTime(getPartition(this.partitionDurationMs, timestamp.longValue(), this.formatter.getZone()));
        return this.FIELD_SUFFIX
                + config.get("partition.field")
                + this.FIELD_SEP
                + partitionField
                + this.delim
                + recordTime.toString(this.formatter);
    }
}

static class PartitionFieldExtractor {

    private final String fieldName;

    PartitionFieldExtractor(String fieldName) {
        this.fieldName = fieldName;
    }

    String extract(ConnectRecord<?> record) {
        Object value = record.value();
        if (value instanceof Struct) {
            Struct struct = (Struct)value;
            return (String) struct.get(fieldName);
        } else {
            FieldAndTimeBasedPartitioner.log.error("Value is not of Struct !");
            throw new PartitionException("Error encoding partition.");
        }
    }
}

public long getPartitionDurationMs() {
    return partitionDurationMs;
}

public TimestampExtractor getTimestampExtractor() {
    return timestampExtractor;
}
}

Это более или менее слияние FieldPartitioner и TimeBasedPartitioner.

Есть какие-нибудь подсказки о том, почему у меня плохая производительность при загрузке сообщений? При разделении с использованием поля в записи десериализация и извлечение данных из сообщения может вызвать эту проблему? Поскольку у меня около 80 различных значений полей, может ли это быть проблемой памяти, поскольку он будет поддерживать в 80 раз больше буферов в куче?

Спасибо за вашу помощь.

1 Ответ

0 голосов
/ 03 сентября 2020

FYI, проблема была в самом разделителе. Моему разделителю нужно было декодировать все сообщение и получить информацию. Поскольку у меня много сообщений, для обработки всех этих событий требуется время.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...