В hdfs для подключения Kafka у нас есть следующий класс SequenceFileWriter.java для записи сообщений kafka в SequenceFileFormat.
import java.io.IOException;
import io.confluent.connect.avro.AvroData;
import io.confluent.connect.hdfs.RecordWriter;
import io.confluent.connect.hdfs.RecordWriterProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.kafka.connect.sink.SinkRecord;
/**
* Provider of a Sequence File record writer.
*/
public class SequenceFileWriterProvider implements RecordWriterProvider
{
public String getExtension() {
return "";
}
@Override
public RecordWriter<SinkRecord> getRecordWriter(Configuration conf, String fileName, SinkRecord record, AvroData avroData) throws IOException {
Path path = new Path(fileName);
final SequenceFile.Writer writer;
SequenceFile.Writer.Option optPath = SequenceFile.Writer.file(path);
SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(LongWritable.class);
SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(Text.class);
SequenceFile.Writer.Option optCodec = SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new BZip2Codec());
writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCodec);
return new RecordWriter<SinkRecord>() {
@Override
public void write(SinkRecord record) throws IOException {
writer.append(
new LongWritable(System.currentTimeMillis()),
new Text((byte[]) record.value())
);
}
@Override
public void close() throws IOException {
writer.close();
}
};
}
}
Мы запускаем confluent 5.0.0 в док-контейнере, управляемом kubernetes. Мы заметили, что когда мы удаляем контроллер репликации в k8s, работающем с соединителем kafka, и воссоздаем контроллер репликации, некоторые файлы последовательности повреждаются. У нас есть искровое задание, которое читает эти данные с помощью SequenceFileReader и получает исключение EOFException, указанное ниже. Также заметил, что есть два дополнительных байта, которые появляются в конце файла. Мы предполагаем, что есть проблема с SequenceFileWriter и нам нужна помощь в проверке Writer. Любая помощь будет оценена. Спасибо.
java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.apache.hadoop.io.WritableUtils.readVLong(WritableUtils.java:308)
at org.apache.hadoop.io.WritableUtils.readVInt(WritableUtils.java:329)
at org.apache.hadoop.io.SequenceFile$Reader.readBuffer(SequenceFile.java:2160)
at org.apache.hadoop.io.SequenceFile$Reader.seekToCurrentValue(SequenceFile.java:2227)
at org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:2263)
at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2394)
at badSequenceFile.readSequenceFile(badSequenceFile.java:27)
at badSequenceFile.main(badSequenceFile.java:345)
Примечание. Когда мы удаляем временные файлы соединителя (+ tmp) перед запуском контроллера репликации k8s, соединитель начинает очищаться и не создает плохие файлы.