Повторяющиеся записи появляются в реализации oaspark.sql.sources.v2.writer.DataWriter.writeRecord? - PullRequest
0 голосов
/ 23 февраля 2019

В настоящее время мы изучаем Apache Spark (с Hadoop) для выполнения крупномасштабного преобразования данных (в Java).

Мы используем новые (и экспериментальные) интерфейсы DataSourceV2 для создания наших пользовательских файлов выходных данных.Компонентом этого является реализация интерфейса org.apache.spark.sql.sources.v2.writer.DataWriter.Все работает прекрасно, за исключением одной проблемы:

Метод org.apache.spark.sql.sources.v2.writer.DataWriter.write(record) часто (но не всегда) вызывается дважды для одной и той же входной записи.

Здесья надеюсь, что вам достаточно кода, чтобы понять суть того, что мы делаем:

В основном у нас есть много больших наборов входных данных, которые мы помещаем через приложение Spark в таблицы Hadoop, используя код, который выглядит примерно так::

  final Dataset<Row> jdbcTableDataset = sparkSession.read()
    .format("jdbc")
    .option("url", sqlServerUrl)
    .option("dbtable", tableName)
    .option("user", jdbcUser)
    .option("password", jdbcPassword)
    .load();

  final DataFrameWriter<Row> dataFrameWriter = jdbcTableDataset.write();
  dataFrameWriter.save(hdfsDestination + "/" + tableName);

Примерно пятьдесят таких таблиц, для чего это стоит.Я знаю, что в данных нет дубликатов, потому что dataFrameWriter.count() и dataFrameWriter.distinct().count() возвращают одно и то же значение.

Процесс преобразования включает выполнение операций соединения с этими таблицами и запись результата в файлы в (общем)файловая система в произвольном формате.Результирующие строки содержат уникальный ключ, столбец dataGroup, столбец dataSubGroup и около 40 других столбцов.Выбранные записи упорядочены по dataGroup, dataSubGroup и ключу.

Каждый выходной файл отличается столбцом dataGroup, который используется для разделения операции write:

  final Dataset<Row> selectedData = dataSelector.selectData();
  selectedData
    .write()
    .partitionBy("dataGroup")
    .format("au.com.mycompany.myformat.DefaultSource")
    .save("/path/to/shared/directory/");

Чтобы дать вам представление о масштабе, полученные в результате выбранные данные состоят из пятидесяти шестидесяти миллионов записей, неравномерно распределенных между примерно 3000 dataGroup файлами.Большой, но не огромный.

partitionBy("dataGroup") аккуратно гарантирует, что каждый файл dataGroup обрабатывается одним исполнителем.Пока все хорошо.

Мой источник данных реализует новый (и экспериментальный) интерфейс DataSourceV2:

package au.com.mycompany.myformat;

import java.io.Serializable;
import java.util.Optional;

import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultSource implements DataSourceRegister, WriteSupport , Serializable {

    private static final Logger logger = LoggerFactory.getLogger(DefaultSource.class);

    public DefaultSource() {
        logger.info("created");
    }

    @Override
    public String shortName() {
        logger.info("shortName");
        return "myformat";
    }

    @Override
    public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) {
        return Optional.of(new MyFormatSourceWriter(writeUUID, schema, mode, options));
    }

}

Существует реализация DataSourceWriter:

public class MyFormatSourceWriter implements DataSourceWriter, Serializable {

  ...

}

иDataSourceWriterFactory реализация:

public class MyDataWriterFactory implements DataWriterFactory<InternalRow> {

  ...

}

и, наконец, DataWriter реализация.Кажется, что DataWriter создается и отправляется каждому исполнителю.Поэтому каждый DataWriter будет обрабатывать многие группы данных.

Каждая запись имеет уникальный ключевой столбец.

public class MyDataWriter implements DataWriter<InternalRow>, Serializable {

  private static final Logger logger = LoggerFactory.getLogger(XcdDataWriter.class);

  ...

  MyDataWriter(File buildDirectory, StructType schema, int partitionId) {
      this.buildDirectory = buildDirectory;
      this.schema = schema;
      this.partitionId = partitionId;
      logger.debug("Created MyDataWriter for partition {}", partitionId);
  }

  private String getFieldByName(InternalRow row, String fieldName) {
      return Optional.ofNullable(row.getUTF8String(schema.fieldIndex(fieldName)))
        .orElse(UTF8String.EMPTY_UTF8)
        .toString();
  }

  /**
   * Rows are written here. Each row has a unique key column as well as a dataGroup
   * column. Right now we are frequently getting called with the same record twice.
   */ 
  @Override
  public void write(InternalRow record) throws IOException {
      String nextDataFileName = getFieldByName(record, "dataGroup") + ".myExt";

      // some non-trivial logic for determining the right output file
      ...

      // write the output record
        outputWriter.append(getFieldByName(row, "key")).append(',')
          .append(getFieldByName(row, "prodDate")).append(',')
          .append(getFieldByName(row, "nation")).append(',')
          .append(getFieldByName(row, "plant")).append(',')
        ...              

  }

  @Override
  public WriterCommitMessage commit() throws IOException {
      ...
      outputWriter.close();
      ...
      logger.debug("Committed partition {} with {} data files for zip file {} for a total of {} zip files",
        partitionId, dataFileCount, dataFileName, dataFileCount);
      return new MyWriterCommitMessage(partitionId, dataFileCount);
  }

  @Override
  public void abort() throws IOException {
      logger.error("Failed to collect data for schema: {}", schema);
      ...
  }

}

Сейчас я работаю над этим, отслеживая последний обработанный ключ и игнорируя дубликаты.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...