Я читаю из темы Pub / Sub, которая работает нормально, теперь мне нужно вставить в таблицу на clickHouse.
Я учусь, пожалуйста, извините за опоздание.
PipelineOptions options = PipelineOptionsFactory.create();
//PubSubToDatabasesPipelineOptions options;
Pipeline p = Pipeline.create(options);
PCollection<String> inputFromPubSub = p.apply(namePrefix + "ReadFromPubSub",
PubsubIO.readStrings().fromSubscription("projects/*********/subscriptions/crypto_bitcoin.dataflow.bigquery.transactions").withIdAttribute(PUBSUB_ID_ATTRIBUTE));
PCollection<TransactionSmall> res = inputFromPubSub.apply(namePrefix + "ReadFromPubSub", ParDo.of(new DoFn<String, TransactionSmall>() {
@ProcessElement
public void processElement(ProcessContext c) {
String item = c.element();
//System.out.print(item);
Transaction transaction = JsonUtils.parseJson(item, Transaction.class);
//System.out.print(transaction);
c.output(new TransactionSmall(new Date(),transaction.getHash(), 123));
}}));
res.apply(ClickHouseIO.<TransactionSmall>write("jdbc:clickhouse://**.**.**.**:8123/litecoin?password=*****", "****"));
p.run().waitUntilFinish();
Моя TransactionSmall.java
import java.io.Serializable;
import java.util.Date;
public class TransactionSmall implements Serializable {
private Date created_dt;
private String hash;
private int number;
public TransactionSmall(Date created_dt, String hash, int number) {
this.created_dt = created_dt;
this.hash = hash;
this.number = number;
}
}
Определение моей таблицы
clickhouse.us-east1-b.c.staging-btc-etl.internal :) CREATE TABLE litecoin.saurabh_blocks_small (`created_date` Date DEFAULT today(), `hash` String, `number` In) ENGINE = MergeTree(created_date, (hash, number), 8192)
CREATE TABLE litecoin.saurabh_blocks_small
(
`created_date` Date,
`hash` String,
`number` In
)
ENGINE = MergeTree(created_date, (hash, number), 8192)
Я получаю сообщение об ошибке типа
java.lang.IllegalArgumentException: Type of @Element must match the DoFn typesaurabhReadFromPubSub2/ParMultiDo(Anonymous).output [PCollection]
at org.apache.beam.sdk.transforms.ParDo.getDoFnSchemaInformation (ParDo.java:577)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo (ParDoTranslation.java:185)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate (ParDoTranslation.java:124)
at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:155)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getParDoPayload (ParDoTranslation.java:650)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.isSplittable (ParDoTranslation.java:665)
at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformMatchers$6.matches (PTransformMatchers.java:269)
at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform (Pipeline.java:282)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:665)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600 (TransformHierarchy.java:317)
at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:251)
at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:460)
at org.apache.beam.sdk.Pipeline.replace (Pipeline.java:260)
at org.apache.beam.sdk.Pipeline.replaceAll (Pipeline.java:210)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:170)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:315)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:301)
at io.blockchainetl.bitcoin.Trail.main (Trail.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:748)
что будетбыть лучшим и самым чистым способом достичь этого без явного создания объектов?
Спасибо