Я пытаюсь записать в Spanner из потокового задания DataFlow с помощью
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>2.18.0</version>
</dependency>
После сопоставления данных с PCollection<Mutation>
Я записываю их в Spanner с помощью SpannerIO.write
Pipeline pipeline = Pipeline.create(options);
PCollection<Mutation> mutations = pipeline.apply...
mutations.apply("WriteMutations", SpannerIO.write()
.withInstanceId(INSTANCE_ID)
.withDatabaseId(DATABASE_ID)
);
pipeline.run();
Однако он выдает
java.lang.IllegalStateException: Sorter should be null here
at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$GatherBundleAndSortFn.startBundle (SpannerIO.java:1080)
В чем причина этого исключения?
Следующий конвейер создает исключение. Я тестирую его с 20 рабочими, но похоже, что он не зависит от загрузки данных.
import com.google.cloud.spanner.Mutation;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
import java.util.UUID;
public final class TestPipeline {
private static final Duration WINDOW_DURATION = Duration.standardSeconds(1);
private static final String DATABASE_ID = "test";
private static final String INSTANCE_ID = "test-spanner";
private static final String TEST_TABLE = "test";
public static void main(String[] args) {
TestPipelineOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(TestPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read pubsub", PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()))
.apply("Parse message", ParDo.of(new ProcessMessage()))
.apply("Windowing", Window.<Mutation>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(WINDOW_DURATION)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply("Write mutations", SpannerIO.write()
.withInstanceId(INSTANCE_ID)
.withDatabaseId(DATABASE_ID)
);
pipeline.run();
}
private static class ProcessMessage extends DoFn<PubsubMessage, Mutation> {
@ProcessElement
public void processElement(@Element final PubsubMessage message,
final OutputReceiver<Mutation> out) {
out.output(Mutation.newInsertOrUpdateBuilder(TEST_TABLE)
.set("id").to(UUID.randomUUID().toString())
.set("string").to("test")
.set("count").to(Long.MAX_VALUE)
.build()
);
}
}
interface TestPipelineOptions extends DataflowPipelineOptions {
void setInputSubscription(String inputSubscription);
@Description("Google Pubsub subscription id.")
String getInputSubscription();
}
}
Таблица CREATE TABLE test (id STRING(50) NOT NULL, string STRING(50) NOT NULL, count INT64) PRIMARY KEY (id);