Мое требование - прочитать из CSV-файла в Google Cloud Storage и загрузить его в Google Datastore. Ниже приведен фрагмент кода.
import com.google.datastore.v1.Entity;
import com.google.datastore.v1.Key;
import com.opencsv.CSVParser;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import javax.annotation.Nullable;
import java.util.UUID;
import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
import static
public class PipelineClass {
static class CreateEntitiesFn extends DoFn<String, Entity> {
private static final Logger LOG = LoggerFactory.getLogger(PipelineClass.class);
private static final long serialVersionUID = 1L;
private final String namespace;
private final String kind;
private final Key ancestorKey;
CreateEntitiesFn(String namespace, String kind) {
this.namespace = namespace;
this.kind = kind;
ancestorKey = makeAncestorKey(namespace, kind);
Entity makeEntity(String id, String group) {
Entity.Builder entityBuilder = Entity.newBuilder();
Key.Builder keyBuilder = makeKey(ancestorKey, kind,
if (namespace != null) {
return entityBuilder.build();
public void processElement(ProcessContext c) throws Exception {
CSVParser parser = new CSVParser();
String[] parts = parser.parseLine(c.element());
String id = parts[0];
String group = parts[1];
c.output(makeEntity(id, group));
static Key makeAncestorKey(@Nullable String namespace, String kind) {
Key.Builder keyBuilder = makeKey(kind, "root");
if (namespace != null) {
return keyBuilder.build();
public interface Options extends PipelineOptions {
@Description("Path of the file to read from and store to Cloud Datastore")
String getInput();
void setInput(String value);
@Description("Dataset ID to read from Cloud Datastore")
String getProject();
void setProject(String value);
@Description("Cloud Datastore Entity Kind")
String getKind();
void setKind(String value);
@Description("Dataset namespace")
String getNamespace();
void setNamespace(@Nullable String value);
@Description("Number of output shards")
int getNumShards();
void setNumShards(int value);
public static void main(String args[]) {
// PipelineOptionsFactory.register(Options.class);
Options options =
Pipeline p = Pipeline.create(options);
CreateEntitiesFn(options.getNamespace(), options.getKind())))
Я использую ecplise для разработки кода. Ниже мое POM. xml
<?xml version="1.0" encoding="UTF-8"?>
~ Copyright (C) 2017 Google Inc.
~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
~ use this file except in compliance with the License. You may obtain a copy of
~ the License at
~ http://www.apache.org/licenses/LICENSE-2.0
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations under
~ the License.
<project xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<name>Sonatype OSS Repository Hosting</name>
<!-- Ensure that the Maven jar plugin runs before the Maven
shade plugin by listing the plugin higher within the file. -->
Configures `mvn package` to produce a bundled jar ("fat jar") for runners
that require this for job submission to a cluster.
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<!-- Adds a dependency on a specific version of the Dataflow SDK. -->
<!-- Dependencies below this line are specific dependencies needed by the examples code. -->
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<!-- Add slf4j API frontend binding with JUL backend -->
<!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
<!-- Hamcrest and JUnit are required dependencies of PAssert,
which is used in the main code of DebuggingWordCount example. -->
При выполнении кода через maven с помощью команды ниже,
mvn compile exec:java -e -Dexec.mainClass=com.dataflow1.PipelineClass -Dexec.args="--project=myproject-263315 --runner=DataflowRunner" -Pdataflow-runner
Я получаю сообщение об ошибке,
Exception in thread "main" java.lang.IllegalArgumentException: Property [project] is marked with contradictory annotations. Found [[Default.String(value=myproject-263315) on my.dataflow1.PipelineClass$Options#getProject()], [Default.InstanceFactory(value=class org.apache.beam.sdk.extensions.gcp.options.GcpOptions$DefaultProjectFactory) on org.apache.beam.runners.dataflow.options.DataflowPipelineOptions#getProject()], [Default.InstanceFactory(value=class org.apache.beam.sdk.extensions.gcp.options.GcpOptions$DefaultProjectFactory) on org.apache.beam.sdk.extensions.gcp.options.GcpOptions#getProject()]].
Может кто-нибудь помочь мне исправить это. Задерживался в этом и задерживался в этом надолго. Любая помощь высоко ценится.