kafka-connect-hdfs: SequenceFileWriter создает плохие файлы при перезапусках соединителя, вызывая EOFException в SequenceFileReader - PullRequest
0 голосов
/ 17 сентября 2018

В hdfs для подключения Kafka у нас есть следующий класс SequenceFileWriter.java для записи сообщений kafka в SequenceFileFormat.

import java.io.IOException;

import io.confluent.connect.avro.AvroData;
import io.confluent.connect.hdfs.RecordWriter;
import io.confluent.connect.hdfs.RecordWriterProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.kafka.connect.sink.SinkRecord;

/**
 * Provider of a Sequence File record writer.
 */
public class SequenceFileWriterProvider implements RecordWriterProvider
{
  public String getExtension() {
    return "";
  }

  @Override
  public RecordWriter<SinkRecord> getRecordWriter(Configuration conf, String fileName, SinkRecord record, AvroData avroData) throws IOException {
    Path path = new Path(fileName);

    final SequenceFile.Writer writer;
    SequenceFile.Writer.Option optPath = SequenceFile.Writer.file(path);
    SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(LongWritable.class);
    SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(Text.class);
    SequenceFile.Writer.Option optCodec = SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new BZip2Codec());
    writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCodec);

    return new RecordWriter<SinkRecord>() {
      @Override
      public void write(SinkRecord record) throws IOException {
        writer.append(
            new LongWritable(System.currentTimeMillis()),
            new Text((byte[]) record.value())
        );
      }

      @Override
      public void close() throws IOException {
        writer.close();
      }
    };
  }
}

Мы запускаем confluent 5.0.0 в док-контейнере, управляемом kubernetes. Мы заметили, что когда мы удаляем контроллер репликации в k8s, работающем с соединителем kafka, и воссоздаем контроллер репликации, некоторые файлы последовательности повреждаются. У нас есть искровое задание, которое читает эти данные с помощью SequenceFileReader и получает исключение EOFException, указанное ниже. Также заметил, что есть два дополнительных байта, которые появляются в конце файла. Мы предполагаем, что есть проблема с SequenceFileWriter и нам нужна помощь в проверке Writer. Любая помощь будет оценена. Спасибо.

java.io.EOFException
    at java.io.DataInputStream.readByte(DataInputStream.java:267)
    at org.apache.hadoop.io.WritableUtils.readVLong(WritableUtils.java:308)
    at org.apache.hadoop.io.WritableUtils.readVInt(WritableUtils.java:329)
    at org.apache.hadoop.io.SequenceFile$Reader.readBuffer(SequenceFile.java:2160)
    at org.apache.hadoop.io.SequenceFile$Reader.seekToCurrentValue(SequenceFile.java:2227)
    at org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:2263)
    at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2394)
    at badSequenceFile.readSequenceFile(badSequenceFile.java:27)
    at badSequenceFile.main(badSequenceFile.java:345)

Примечание. Когда мы удаляем временные файлы соединителя (+ tmp) перед запуском контроллера репликации k8s, соединитель начинает очищаться и не создает плохие файлы.

1 Ответ

0 голосов
/ 30 мая 2019

Изменение writer.append для обработки исключений, похоже, решило проблему не записи файлов с неверной последовательностью с неуместным маркером конца файла (EOF).Дополнительно также выполнено приведение типа значения записи в тип данных String из байта.

return new RecordWriter<SinkRecord>() {
  @Override
  public void write(SinkRecord record) {
      if (record != null) {
          byte[] text = (byte[]) record.value();
              try{
                  writer.append(
                          new LongWritable(System.currentTimeMillis()),
                          new Text(new String (text))
                  );

              } catch (Exception e) {
                  logger.error("Exception encounterd : "+e+" for text : "+text);
              }
          }
      }
  }

...