Я планирую использовать пользовательский разделитель на основе полей и времени для разделения моих данных в s3 следующим образом: / part_ знак равно /part_date=YYYY-MM-dd/part_hour=HH/....parquet.
Мой Partitioner работает нормально, в моем ведре S3 все как и ожидалось.
Проблема связан с производительностью приемника
У меня 400 КБ / с / брокер = ~ 1,2 МБ / с в моем входном topi c, а приемник работает с пиками и фиксирует небольшое количество записей.
Если я использую classi c TimeBasedPartitioner, введите здесь описание изображения
Итак, моя проблема, похоже, в моем пользовательском разделителе. Вот код:
package test;
import ...;
public final class FieldAndTimeBasedPartitioner<T> extends TimeBasedPartitioner<T> {
private static final Logger log = LoggerFactory.getLogger(FieldAndTimeBasedPartitioner.class);
private static final String FIELD_SUFFIX = "part_";
private static final String FIELD_SEP = "=";
private long partitionDurationMs;
private DateTimeFormatter formatter;
private TimestampExtractor timestampExtractor;
private PartitionFieldExtractor partitionFieldExtractor;
protected void init(long partitionDurationMs, String pathFormat, Locale locale, DateTimeZone timeZone, Map<String, Object> config) {
this.delim = (String)config.get("directory.delim");
this.partitionDurationMs = partitionDurationMs;
try {
this.formatter = getDateTimeFormatter(pathFormat, timeZone).withLocale(locale);
this.timestampExtractor = this.newTimestampExtractor((String)config.get("timestamp.extractor"));
this.timestampExtractor.configure(config);
this.partitionFieldExtractor = new PartitionFieldExtractor((String)config.get("partition.field"));
} catch (IllegalArgumentException e) {
ConfigException ce = new ConfigException("path.format", pathFormat, e.getMessage());
ce.initCause(e);
throw ce;
}
}
private static DateTimeFormatter getDateTimeFormatter(String str, DateTimeZone timeZone) {
return DateTimeFormat.forPattern(str).withZone(timeZone);
}
public static long getPartition(long timeGranularityMs, long timestamp, DateTimeZone timeZone) {
long adjustedTimestamp = timeZone.convertUTCToLocal(timestamp);
long partitionedTime = adjustedTimestamp / timeGranularityMs * timeGranularityMs;
return timeZone.convertLocalToUTC(partitionedTime, false);
}
public String encodePartition(SinkRecord sinkRecord, long nowInMillis) {
final Long timestamp = this.timestampExtractor.extract(sinkRecord, nowInMillis);
final String partitionField = this.partitionFieldExtractor.extract(sinkRecord);
return this.encodedPartitionForFieldAndTime(sinkRecord, timestamp, partitionField);
}
public String encodePartition(SinkRecord sinkRecord) {
final Long timestamp = this.timestampExtractor.extract(sinkRecord);
final String partitionFieldValue = this.partitionFieldExtractor.extract(sinkRecord);
return encodedPartitionForFieldAndTime(sinkRecord, timestamp, partitionFieldValue);
}
private String encodedPartitionForFieldAndTime(SinkRecord sinkRecord, Long timestamp, String partitionField) {
if (timestamp == null) {
String msg = "Unable to determine timestamp using timestamp.extractor " + this.timestampExtractor.getClass().getName() + " for record: " + sinkRecord;
log.error(msg);
throw new ConnectException(msg);
} else if (partitionField == null) {
String msg = "Unable to determine partition field using partition.field '" + partitionField + "' for record: " + sinkRecord;
log.error(msg);
throw new ConnectException(msg);
} else {
DateTime recordTime = new DateTime(getPartition(this.partitionDurationMs, timestamp.longValue(), this.formatter.getZone()));
return this.FIELD_SUFFIX
+ config.get("partition.field")
+ this.FIELD_SEP
+ partitionField
+ this.delim
+ recordTime.toString(this.formatter);
}
}
static class PartitionFieldExtractor {
private final String fieldName;
PartitionFieldExtractor(String fieldName) {
this.fieldName = fieldName;
}
String extract(ConnectRecord<?> record) {
Object value = record.value();
if (value instanceof Struct) {
Struct struct = (Struct)value;
return (String) struct.get(fieldName);
} else {
FieldAndTimeBasedPartitioner.log.error("Value is not of Struct !");
throw new PartitionException("Error encoding partition.");
}
}
}
public long getPartitionDurationMs() {
return partitionDurationMs;
}
public TimestampExtractor getTimestampExtractor() {
return timestampExtractor;
}
}
Это более или менее слияние FieldPartitioner и TimeBasedPartitioner.
Есть какие-нибудь подсказки о том, почему у меня плохая производительность при загрузке сообщений? При разделении с использованием поля в записи десериализация и извлечение данных из сообщения может вызвать эту проблему? Поскольку у меня около 80 различных значений полей, может ли это быть проблемой памяти, поскольку он будет поддерживать в 80 раз больше буферов в куче?
Спасибо за вашу помощь.