Нужно вставить строки в clickhouseIO из Apache Beam (поток данных) - PullRequest
0 голосов
/ 24 сентября 2019

Я читаю из темы Pub / Sub, которая работает нормально, теперь мне нужно вставить в таблицу на clickHouse.

Я учусь, пожалуйста, извините за опоздание.


        PipelineOptions options = PipelineOptionsFactory.create();


        //PubSubToDatabasesPipelineOptions options;
        Pipeline p = Pipeline.create(options);

        PCollection<String> inputFromPubSub = p.apply(namePrefix + "ReadFromPubSub",
                PubsubIO.readStrings().fromSubscription("projects/*********/subscriptions/crypto_bitcoin.dataflow.bigquery.transactions").withIdAttribute(PUBSUB_ID_ATTRIBUTE));



        PCollection<TransactionSmall> res = inputFromPubSub.apply(namePrefix + "ReadFromPubSub", ParDo.of(new DoFn<String, TransactionSmall>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                String item = c.element();
                //System.out.print(item);
                Transaction transaction = JsonUtils.parseJson(item, Transaction.class);
                //System.out.print(transaction);
                c.output(new TransactionSmall(new Date(),transaction.getHash(), 123));
            }}));


        res.apply(ClickHouseIO.<TransactionSmall>write("jdbc:clickhouse://**.**.**.**:8123/litecoin?password=*****", "****"));

        p.run().waitUntilFinish();

Моя TransactionSmall.java

import java.io.Serializable;
import java.util.Date;

public class TransactionSmall implements Serializable {

    private Date created_dt;
    private String hash;

    private int number;

    public TransactionSmall(Date created_dt, String hash, int number) {
        this.created_dt = created_dt;
        this.hash = hash;
        this.number = number;
    }
}

Определение моей таблицы

clickhouse.us-east1-b.c.staging-btc-etl.internal :) CREATE TABLE litecoin.saurabh_blocks_small (`created_date` Date DEFAULT today(), `hash` String, `number` In) ENGINE = MergeTree(created_date, (hash, number), 8192)

CREATE TABLE litecoin.saurabh_blocks_small
(
    `created_date` Date, 
    `hash` String, 
    `number` In
)
ENGINE = MergeTree(created_date, (hash, number), 8192)

Я получаю сообщение об ошибке типа

java.lang.IllegalArgumentException: Type of @Element must match the DoFn typesaurabhReadFromPubSub2/ParMultiDo(Anonymous).output [PCollection]
    at org.apache.beam.sdk.transforms.ParDo.getDoFnSchemaInformation (ParDo.java:577)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo (ParDoTranslation.java:185)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate (ParDoTranslation.java:124)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:155)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getParDoPayload (ParDoTranslation.java:650)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.isSplittable (ParDoTranslation.java:665)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformMatchers$6.matches (PTransformMatchers.java:269)
    at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform (Pipeline.java:282)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:665)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600 (TransformHierarchy.java:317)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:251)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:460)
    at org.apache.beam.sdk.Pipeline.replace (Pipeline.java:260)
    at org.apache.beam.sdk.Pipeline.replaceAll (Pipeline.java:210)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:170)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:315)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:301)
    at io.blockchainetl.bitcoin.Trail.main (Trail.java:74)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:748)


что будетбыть лучшим и самым чистым способом достичь этого без явного создания объектов?

Спасибо

1 Ответ

0 голосов
/ 24 сентября 2019

Это, вероятно, происходит, потому что Beam полагается на спецификацию кодера для PCollection, когда выводит схему для него.Кажется, что возникают проблемы с выводом схемы ввода для вашего преобразования ClickhouseIO.

Вы можете заставить Beam иметь схему, указав кодер с выводом схемы, такой как AvroCoder.Вы бы сделали:

@DefaultCoder(AvroCoder.class)
public class TransactionSmall implements Serializable {

    private Date created_dt;
    private String hash;

    private int number;

    public TransactionSmall(Date created_dt, String hash, int number) {
        this.created_dt = created_dt;
        this.hash = hash;
        this.number = number;
    }
}

Или вы также можете установить кодер для PCollection на вашем конвейере:

PCollection<TransactionSmall> res = inputFromPubSub.apply(namePrefix + "ReadFromPubSub", ParDo.of(new DoFn<String, TransactionSmall>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
        String item = c.element();
        Transaction transaction = JsonUtils.parseJson(item, Transaction.class);
        c.output(new TransactionSmall(new Date(),transaction.getHash(), 123));
     }}))
    .setCoder(AvroCoder.of(TransactionSmall.class));


res.apply(ClickHouseIO.<TransactionSmall>write("jdbc:clickhouse://**.**.**.**:8123/litecoin?password=*****", "****"));
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...