Как дать доброе имя в DatastoreIO.v1.write - PullRequest
0 голосов
/ 04 июля 2018

Я пытаюсь прочитать данные из вида в хранилище данных Google, применить некоторые преобразования и выполнить обратную запись в другой вид. Я использую поток данных Google для достижения этой цели. Читая из Datastore, мы можем дать добро. Но не в состоянии дать добро во время письма. Как этого добиться.

1 Ответ

0 голосов
/ 04 июля 2018

РЕДАКТИРОВАТЬ: Ой, я просто заметил, что вы просили Java DatastoreIO.v1.write смотрит на Java-эквивалент WriteToDatastore для меня, и в этом случае вы должны настроить свою сущность (включая вид) на предыдущем шаге. Проверьте CreateEntityFn в этом примере. https://github.com/mbrukman/apache-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java#L197

ОРИГИНАЛ:

Вот как я это делаю

import apache_beam
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from googledatastore import helper

class MakeEntity(object):
    def __init__(self, project):
        self._project = project

    def make(self, element):
        try:
            entity = entity_pb2.Entity()
            helper.add_key_path(entity.key, 'EntityKind', element['id'])
            helper.add_properties(entity, {
                "created": datetime.datetime.now(),
                "email": unicode(element['email'],
                "count": int(element['count'],
                "amount": float(element['amount'],
            })
            return entity
        except:
            logging.error(traceback.format_exc())
            raise

def build_pipeline(project, pipeline_options):
    p = apache_beam.Pipeline(options=pipeline_options)
    _ = \
        (p
         # other transforms
         | 'create entity' >> apache_beam.Map(MakeEntity(project=project).make)
         | 'write to datastore' >> WriteToDatastore(project=project))
    return p

Редактировать # 2: Я изменил ваш код, чтобы более точно следовать примеру, на который я ссылался. Надеюсь, это работает

public class ModifyEntityKindFn extends DoFn<Entity, Entity> { 

    @ProcessElement 
    public void processElement(ProcessContext context) { 

        Key.Builder keyBuilder = makeKey(NEW_KIND, inputEntity.getKey());
        keyBuilder.getPartitionIdBuilder().setNamespaceId(NEW_NAMESPACE); 
        Entity.Builder entityBuilder = Entity.newBuilder().setKey(keyBuilder.build()); 
        entityBuilder.getMutableProperties().put("content", makeValue(context.element()).build());
        context.output(entityBuilder.build()); 

    } 

} 
...