SpannerIO java .lang.IllegalStateException: сортировщик должен быть нулевым здесь - PullRequest
1 голос
/ 12 марта 2020

Я пытаюсь записать в Spanner из потокового задания DataFlow с помощью

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>2.18.0</version>
</dependency>

После сопоставления данных с PCollection<Mutation> Я записываю их в Spanner с помощью SpannerIO.write

Pipeline pipeline = Pipeline.create(options);

PCollection<Mutation> mutations = pipeline.apply...

mutations.apply("WriteMutations", SpannerIO.write()
                .withInstanceId(INSTANCE_ID)
                .withDatabaseId(DATABASE_ID)
);

pipeline.run();

Однако он выдает

java.lang.IllegalStateException: Sorter should be null here
        at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$GatherBundleAndSortFn.startBundle (SpannerIO.java:1080)

В чем причина этого исключения?

Следующий конвейер создает исключение. Я тестирую его с 20 рабочими, но похоже, что он не зависит от загрузки данных.

import com.google.cloud.spanner.Mutation;

import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

import java.util.UUID;

public final class TestPipeline {

    private static final Duration WINDOW_DURATION = Duration.standardSeconds(1);
    private static final String DATABASE_ID = "test";
    private static final String INSTANCE_ID = "test-spanner";
    private static final String TEST_TABLE = "test";

    public static void main(String[] args) {
        TestPipelineOptions options = PipelineOptionsFactory
                .fromArgs(args)
                .withValidation()
                .as(TestPipelineOptions.class);

        Pipeline pipeline = Pipeline.create(options);

        pipeline
                .apply("Read pubsub", PubsubIO.readMessagesWithAttributes()
                        .fromSubscription(options.getInputSubscription()))
                .apply("Parse message", ParDo.of(new ProcessMessage()))
                .apply("Windowing", Window.<Mutation>into(new GlobalWindows())
                        .triggering(Repeatedly.forever(
                                AfterProcessingTime.pastFirstElementInPane()
                                        .plusDelayOf(WINDOW_DURATION)))
                        .withAllowedLateness(Duration.ZERO)
                        .discardingFiredPanes())
                .apply("Write mutations", SpannerIO.write()
                        .withInstanceId(INSTANCE_ID)
                        .withDatabaseId(DATABASE_ID)
                );

        pipeline.run();
    }

    private static class ProcessMessage extends DoFn<PubsubMessage, Mutation> {

        @ProcessElement
        public void processElement(@Element final PubsubMessage message,
                                   final OutputReceiver<Mutation> out) {
            out.output(Mutation.newInsertOrUpdateBuilder(TEST_TABLE)
                    .set("id").to(UUID.randomUUID().toString())
                    .set("string").to("test")
                    .set("count").to(Long.MAX_VALUE)
                    .build()
            );
        }
    }

    interface TestPipelineOptions extends DataflowPipelineOptions {

        void setInputSubscription(String inputSubscription);

        @Description("Google Pubsub subscription id.")
        String getInputSubscription();
    }

}

Таблица CREATE TABLE test (id STRING(50) NOT NULL, string STRING(50) NOT NULL, count INT64) PRIMARY KEY (id);

1 Ответ

1 голос
/ 16 марта 2020

Похоже, что эта проблема возникает с apache балкой версии 2.18, но не с версией 2.17.

Проблема с apache балкой версии 2.18 отслеживается здесь: https://issues.apache.org/jira/browse/BEAM-9505

...