РЕДАКТИРОВАТЬ: Ой, я просто заметил, что вы просили Java DatastoreIO.v1.write
смотрит на Java-эквивалент WriteToDatastore
для меня, и в этом случае вы должны настроить свою сущность (включая вид) на предыдущем шаге. Проверьте CreateEntityFn
в этом примере. https://github.com/mbrukman/apache-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java#L197
ОРИГИНАЛ:
Вот как я это делаю
import apache_beam
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from googledatastore import helper
class MakeEntity(object):
def __init__(self, project):
self._project = project
def make(self, element):
try:
entity = entity_pb2.Entity()
helper.add_key_path(entity.key, 'EntityKind', element['id'])
helper.add_properties(entity, {
"created": datetime.datetime.now(),
"email": unicode(element['email'],
"count": int(element['count'],
"amount": float(element['amount'],
})
return entity
except:
logging.error(traceback.format_exc())
raise
def build_pipeline(project, pipeline_options):
p = apache_beam.Pipeline(options=pipeline_options)
_ = \
(p
# other transforms
| 'create entity' >> apache_beam.Map(MakeEntity(project=project).make)
| 'write to datastore' >> WriteToDatastore(project=project))
return p
Редактировать # 2: Я изменил ваш код, чтобы более точно следовать примеру, на который я ссылался. Надеюсь, это работает
public class ModifyEntityKindFn extends DoFn<Entity, Entity> {
@ProcessElement
public void processElement(ProcessContext context) {
Key.Builder keyBuilder = makeKey(NEW_KIND, inputEntity.getKey());
keyBuilder.getPartitionIdBuilder().setNamespaceId(NEW_NAMESPACE);
Entity.Builder entityBuilder = Entity.newBuilder().setKey(keyBuilder.build());
entityBuilder.getMutableProperties().put("content", makeValue(context.element()).build());
context.output(entityBuilder.build());
}
}