Beam Java SDK с TFRecord и сжатым GZIP - PullRequest
0 голосов
/ 12 сентября 2018

Мы часто используем Beam Java SDK (и Google Cloud Dataflow для запуска пакетных заданий), и мы заметили что-то странное (возможно, ошибку?), Когда пытались использовать TFRecordIO с Compression.GZIP. Мы смогли придумать пример кода, который может воспроизвести ошибки, с которыми мы сталкиваемся.

Для ясности, мы используем Beam Java SDK 2.4.

Предположим, у нас есть PCollection<byte[]>, который может быть ПК с прототипами сообщений, например, в формате byte []. Обычно мы записываем это в GCS (Google Cloud Storage), используя кодировку Base64 (строки с разделителями новой строки) или TFRecordIO (без сжатия). У нас не было проблем с чтением данных из GCS таким образом в течение очень длительного времени (2,5+ года для первого и ~ 1,5 года для второго).

Недавно мы попытались TFRecordIO с опцией Compression.GZIP, а иногда мы получаем исключение, так как данные считаются недействительными (при чтении). Сами данные (файлы gzip) не повреждены, и мы протестировали разные вещи и пришли к следующему выводу.

Когда byte[], который сжимается в TFRecordIO, выше определенного порога (я бы сказал, когда он равен или выше 8192), тогда TFRecordIO.read().withCompression(Compression.GZIP) не будет работать. В частности, он выдаст следующее исключение:

Exception in thread "main" java.lang.IllegalStateException: Invalid data
    at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
    at org.apache.beam.sdk.io.TFRecordIO$TFRecordCodec.read(TFRecordIO.java:642)
    at org.apache.beam.sdk.io.TFRecordIO$TFRecordSource$TFRecordReader.readNextRecord(TFRecordIO.java:526)
    at org.apache.beam.sdk.io.CompressedSource$CompressedReader.readNextRecord(CompressedSource.java:426)
    at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:473)
    at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:468)
    at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:261)
    at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.processElement(BoundedReadEvaluatorFactory.java:141)
    at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Это может быть легко воспроизведено, поэтому вы можете обратиться к коду в конце. Вы также увидите комментарии о длине байтового массива (поскольку я проверял различные размеры, я пришел к выводу, что 8192 - это магическое число).

Поэтому мне интересно, является ли это ошибкой или известной проблемой - я не смог найти ничего похожего в трекере ошибок Apache Beam здесь , но если есть другой форум / сайт, мне нужно проверьте, пожалуйста, дайте мне знать! Если это действительно ошибка, какой канал лучше всего сообщить об этом?


Следующий код может воспроизвести ошибку, которую мы имеем.

При успешном запуске (с параметрами 1, 39, 100) в конце будет показано следующее сообщение:

------------ counter metrics from CountDoFn
[counter]             plain_base64_proto_array_len: 8126
[counter]                    plain_base64_proto_in:   1
[counter]               plain_base64_proto_val_cnt:  39
[counter]              tfrecord_gz_proto_array_len: 8126
[counter]                     tfrecord_gz_proto_in:   1
[counter]                tfrecord_gz_proto_val_cnt:  39
[counter]          tfrecord_uncomp_proto_array_len: 8126
[counter]                 tfrecord_uncomp_proto_in:   1
[counter]            tfrecord_uncomp_proto_val_cnt:  39

С параметрами (1, 40, 100), которые увеличивают длину байтового массива до 8192, он выдаст указанное исключение.

Вы можете настроить параметры (внутри CreateRandomProtoData DoFn), чтобы понять, почему длина byte[] в зашифрованном виде имеет значение. Это может также помочь вам использовать следующий класс Java Proto-gen (для TestProto, используемого в основном коде выше. Вот оно: gist link

Ссылка: Основной код:

package exp.moloco.dataflow2.compression; // NOTE: Change appropriately.

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.TreeMap;

import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TFRecordIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.codec.binary.Base64;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.protobuf.InvalidProtocolBufferException;

import com.moloco.dataflow.test.StackOverflow.TestProto;
import com.moloco.dataflow2.Main;

// @formatter:off
// This code uses TestProto (java class) that is generated by protoc.
// The message definition is as follows (in proto3, but it shouldn't matter):
// message TestProto {
//   int64 count = 1;
//   string name = 2;
//   repeated string values = 3;
// }
// Note that this code does not depend on whether this proto is used,
// or any other byte[] is used (see CreateRandomData DoFn later which generates the data being used in the code).
// We tested both, but are presenting this as a concrete example of how (our) code in production can be affected.
// @formatter:on

public class CompressionTester {
  private static final Logger LOG = LoggerFactory.getLogger(CompressionTester.class);

  static final List<String> lines = Arrays.asList("some dummy string that will not used in this job.");

  // Some GCS buckets where data will be written to.
  // %s will be replaced by some timestamped String for easy debugging.
  static final String PATH_TO_GCS_PLAIN_BASE64 = Main.SOME_BUCKET + "/comp-test/%s/output-plain-base64";
  static final String PATH_TO_GCS_TFRECORD_UNCOMP = Main.SOME_BUCKET + "/comp-test/%s/output-tfrecord-uncompressed";
  static final String PATH_TO_GCS_TFRECORD_GZ = Main.SOME_BUCKET + "/comp-test/%s/output-tfrecord-gzip";

  // This DoFn reads byte[] which represents a proto message (TestProto).
  // It simply counts the number of proto objects it processes
  // as well as the number of Strings each proto object contains.
  // When the pipeline terminates, the values of the Counters will be printed out.
  static class CountDoFn extends DoFn<byte[], TestProto> {

    private final Counter protoIn;
    private final Counter protoValuesCnt;
    private final Counter protoByteArrayLength;

    public CountDoFn(String name) {
      protoIn = Metrics.counter(this.getClass(), name + "_proto_in");
      protoValuesCnt = Metrics.counter(this.getClass(), name + "_proto_val_cnt");
      protoByteArrayLength = Metrics.counter(this.getClass(), name + "_proto_array_len");
    }

    @ProcessElement
    public void processElement(ProcessContext c) throws InvalidProtocolBufferException {
      protoIn.inc();
      TestProto tp = TestProto.parseFrom(c.element());
      protoValuesCnt.inc(tp.getValuesCount());
      protoByteArrayLength.inc(c.element().length);
    }
  }

  // This DoFn emits a number of TestProto objects as byte[].
  // Input to this DoFn is ignored (not used).
  // Each TestProto object contains three fields: count (int64), name (string), and values (repeated string).
  // The three parameters in DoFn determines
  // (1) the number of proto objects to be generated,
  // (2) the number of (repeated) strings to be added to each proto object, and
  // (3) the length of (each) string.
  // TFRecord with Compression (when reading) fails when the parameters are 1, 40, 100, for instance.
  // TFRecord with Compression (when reading) succeeds when the parameters are 1, 39, 100, for instance.
  static class CreateRandomProtoData extends DoFn<String, byte[]> {

    static final int NUM_PROTOS = 1; // Total number of TestProto objects to be emitted by this DoFn.
    static final int NUM_STRINGS = 40; // Total number of strings in each TestProto object ('repeated string').
    static final int STRING_LEN = 100; // Length of each string object.

    // Returns a random string of length len.
    // For debugging purposes, the string only contains upper-case English alphabets.
    static String getRandomString(Random rd, int len) {
      StringBuffer sb = new StringBuffer();
      for (int i = 0; i < len; i++) {
        sb.append('A' + (rd.nextInt(26)));
      }
      return sb.toString();
    }

    // Returns a randomly generated TestProto object.
    // Each string is generated randomly using getRandomString().
    static TestProto getRandomProto(Random rd) {
      TestProto.Builder tpBuilder = TestProto.newBuilder();

      tpBuilder.setCount(rd.nextInt());
      tpBuilder.setName(getRandomString(rd, STRING_LEN));
      for (int i = 0; i < NUM_STRINGS; i++) {
        tpBuilder.addValues(getRandomString(rd, STRING_LEN));
      }

      return tpBuilder.build();
    }

    // Emits TestProto objects are byte[].
    @ProcessElement
    public void processElement(ProcessContext c) {
      // For debugging purposes, we set the seed here.
      Random rd = new Random();
      rd.setSeed(132475);

      for (int n = 0; n < NUM_PROTOS; n++) {
        byte[] data = getRandomProto(rd).toByteArray();
        c.output(data);
        // With parameters (1, 39, 100), the array length is 8126. It works fine.
        // With parameters (1, 40, 100), the array length is 8329. It breaks TFRecord with GZIP.
        System.out.println("\n--------------------------\n");
        System.out.println("byte array length = " + data.length);
        System.out.println("\n--------------------------\n");
      }
    }
  }

  public static void execute() {
    PipelineOptions options = PipelineOptionsFactory.create();
    options.setJobName("compression-tester");
    options.setRunner(DirectRunner.class);

    // For debugging purposes, write files under 'gcsSubDir' so we can easily distinguish.
    final String gcsSubDir =
        String.format("%s-%d", DateTime.now(DateTimeZone.UTC), DateTime.now(DateTimeZone.UTC).getMillis());

    // Write PCollection<TestProto> in 3 different ways to GCS.
    {
      Pipeline pipeline = Pipeline.create(options);

      // Create dummy data which is a PCollection of byte arrays (each array representing a proto message).
      PCollection<byte[]> data = pipeline.apply(Create.of(lines)).apply(ParDo.of(new CreateRandomProtoData()));

      // 1. Write as plain-text with base64 encoding.
      data.apply(ParDo.of(new DoFn<byte[], String>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
          c.output(new String(Base64.encodeBase64(c.element())));
        }
      })).apply(TextIO.write().to(String.format(PATH_TO_GCS_PLAIN_BASE64, gcsSubDir)).withNumShards(1));

      // 2. Write as TFRecord.
      data.apply(TFRecordIO.write().to(String.format(PATH_TO_GCS_TFRECORD_UNCOMP, gcsSubDir)).withNumShards(1));

      // 3. Write as TFRecord-gzip.
      data.apply(TFRecordIO.write().withCompression(Compression.GZIP)
          .to(String.format(PATH_TO_GCS_TFRECORD_GZ, gcsSubDir)).withNumShards(1));

      pipeline.run().waitUntilFinish();
    }

    LOG.info("-------------------------------------------");
    LOG.info("               READ TEST BEGINS ");
    LOG.info("-------------------------------------------");

    // Read PCollection<TestProto> in 3 different ways from GCS.
    {
      Pipeline pipeline = Pipeline.create(options);

      // 1. Read as plain-text.
      pipeline.apply(TextIO.read().from(String.format(PATH_TO_GCS_PLAIN_BASE64, gcsSubDir) + "*"))
          .apply(ParDo.of(new DoFn<String, byte[]>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
              c.output(Base64.decodeBase64(c.element()));
            }
          })).apply("plain-base64", ParDo.of(new CountDoFn("plain_base64")));

      // 2. Read as TFRecord -> byte array.
      pipeline.apply(TFRecordIO.read().from(String.format(PATH_TO_GCS_TFRECORD_UNCOMP, gcsSubDir) + "*"))
          .apply("tfrecord-uncomp", ParDo.of(new CountDoFn("tfrecord_uncomp")));

      // 3. Read as TFRecord-gz -> byte array.
      // This seems to fail when 'data size' becomes large.
      pipeline
          .apply(TFRecordIO.read().withCompression(Compression.GZIP)
              .from(String.format(PATH_TO_GCS_TFRECORD_GZ, gcsSubDir) + "*"))
          .apply("tfrecord_gz", ParDo.of(new CountDoFn("tfrecord_gz")));

      // 4. Run pipeline.
      PipelineResult res = pipeline.run();
      res.waitUntilFinish();

      // Check CountDoFn's metrics.
      // The numbers should match.
      Map<String, Long> counterValues = new TreeMap<String, Long>();
      for (MetricResult<Long> counter : res.metrics().queryMetrics(MetricsFilter.builder().build()).counters()) {
        counterValues.put(counter.name().name(), counter.committed());
      }
      StringBuffer sb = new StringBuffer();
      sb.append("\n------------ counter metrics from CountDoFn\n");
      for (Entry<String, Long> entry : counterValues.entrySet()) {
        sb.append(String.format("[counter] %40s: %5d\n", entry.getKey(), entry.getValue()));
      }
      LOG.info(sb.toString());
    }
  }
}

1 Ответ

0 голосов
/ 17 сентября 2018

Это похоже на ошибку в TFRecordIO.Channel.read() может прочитать меньше байтов, чем емкость входного буфера.8192 кажется размером буфера в GzipCompressorInputStream.Я подал https://issues.apache.org/jira/browse/BEAM-5412.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...