Bigtable Insert - поток данных - PullRequest
       8

Bigtable Insert - поток данных

0 голосов
/ 17 декабря 2018

Я пытаюсь построить конвейер потока данных для преобразования объекта AVRO, ниже приведен код

pipeline.apply("Read from Avro", AvroIO.readGenericRecords(schema).from("test.test"))
                .apply(ParDo.of(new TransformAvro()))
                .apply("Write to Bigtable", write);
        return pipeline.run();


Below is the TransformAvro DoFn function

  try {
                String fieldName = field.schema().getName();
                fieldNameByte = fieldName.getBytes("UTF-8");

                String value = String.valueOf(gen.get(fieldName));
                fieldValueByte = value.getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            Mutation.SetCell setCell =
                    Mutation.SetCell.newBuilder()
                            .setFamilyName(COLUMN_FAMILY_NAME)
                            .setColumnQualifier(ByteString.copyFrom(fieldNameByte))
                            .setTimestampMicros(-1)
                            .setValue(ByteString.copyFrom(fieldValueByte))
                            .build();

        mutations.add(Mutation.newBuilder().setSetCell(setCell).build());

        }

         c.output(KV.of(ByteString.copyFrom("test".getBytes()), mutations.build()));

Мне нужна помощь в выяснении, как выбрать ключ строки, сейчас у меня просто статическая строка.. Мне нужно иметь возможность извлечь значение из записи и использовать его в качестве ключа строки и сохранить все остальные поля в виде большого двоичного объекта

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...