Какая лучшая практика для Kafka Streaming в качестве замены ETL? - PullRequest
0 голосов
/ 16 июня 2019

Я новичок в kafka и сейчас смотрю на Kafka Streams, особенно объединяю два потока.

Примеры, которые я просматривал, работали с довольно простыми сообщениями / текстовыми сообщениями.Поэтому я построил еще один простой пример, который больше относится к традиционному ETL.Допустим, у нас есть два «набора данных»: Contract (= Vertrag) и Cashflow, с количеством элементов от 1 до n.

В моем примере я создал тему для каждого и отправил объекты (Vertrag, Cashflow) вкаждый.

И мне удалось их первое объединение.

KStream<String, String> joined = srcVertrag.leftJoin(srcCashflow,
                (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
                JoinWindows.of(5000),
                Joined.with(
                  Serdes.String(), /* key */
                  Serdes.String(),   /* left value */
                  Serdes.String())  /* right value */
              );

Результат выглядит так:

left={"name":"Vertrag123","vertragId":"123"}, right={"buchungstag":1560715764709,"betrag":12.0,"vertragId":"123"}

Теперь мои вопросы:

  • это правильный способ сделать это?
  • Стоит ли вообще создавать Объекты или, вернее, обрабатывать только строки?

После ваших подсказок и дальнейших исследований я предложил следующий тест.- Я создал Pojos для «Vertrag» и «Cashflow» - я создал Serdes для каждого - я транслирую их как объекты - Наконец я пытаюсь объединить их в класс Wrapper.(и здесь я вешаюсь)

Я не нахожу образцы, которые делают что-то вроде этого.Это так экзотично?

package tki.bigdata.kafkaetl;

import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;

import tki.bigdata.domain.Cashflow;
import tki.bigdata.domain.Vertrag;
import tki.bigdata.serde.JsonPOJODeserializer;
import tki.bigdata.serde.JsonPOJOSerializer;



@ComponentScan(basePackages = { "tki.bigdata.domain", "tki.bigdata.config", "tki.bigdata.app" }, basePackageClasses = App.class)
@SpringBootApplication
@EnableScheduling
public class App implements CommandLineRunner {
    private static String bootstrapServers = "tobi0179.westeurope.cloudapp.azure.com:9092";

    @Autowired
    private KafkaTemplate<String, Object> template;

    // @Autowired
    // ExcelReader excelReader;

    public static void main(String[] args) {

        SpringApplication.run(App.class, args).close();
    }

    private void populateSampleData() {
        Vertrag v = new Vertrag();
        v.setVertragId("123");
        v.setName("Vertrag123");

        template.send("Vertrag", "123", v);
        //template.send("Vertrag", "124", "124;Vertrag12");

        Cashflow c = new Cashflow();
        c.setVertragId("123");
        c.setBetrag(12);
        c.setBuchungstag(new Date());
        template.send("Cashflow", "123", c);

    }

    //@Override
    public void run(String... args) throws Exception { 
        // Topics mit Demodata befüllen
        populateSampleData();

        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // TODO: the following can be removed with a serialization factory
        Map<String, Object> serdeProps = new HashMap<>();


        // prepare Serde for Vertrag
        final Serializer<Vertrag> vertragSerializer = new JsonPOJOSerializer<Vertrag>();
        serdeProps.put("JsonPOJOClass", Vertrag.class);
        vertragSerializer.configure(serdeProps, false);

        final Deserializer<Vertrag> vertragDeserializer = new JsonPOJODeserializer<Vertrag>();
        serdeProps.put("JsonPOJOClass", Vertrag.class);
        vertragDeserializer.configure(serdeProps, false);

        final Serde<Vertrag> vertragSerde = Serdes.serdeFrom(vertragSerializer, vertragDeserializer);

        // prepare Serde for Cashflow
        final Serializer<Cashflow> cashflowSerializer = new JsonPOJOSerializer<Cashflow>();
        serdeProps.put("JsonPOJOClass", Vertrag.class);
        cashflowSerializer.configure(serdeProps, false);

        final Deserializer<Cashflow> cashflowDeserializer = new JsonPOJODeserializer<Cashflow>();
        serdeProps.put("JsonPOJOClass", Vertrag.class);
        cashflowDeserializer.configure(serdeProps, false);

        final Serde<Cashflow> cashflowSerde = Serdes.serdeFrom(cashflowSerializer, cashflowDeserializer);



        // streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
        // TestUtils.tempDir().getAbsolutePath());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, Vertrag> srcVertrag = builder.stream("Vertrag");
        KStream<String, Cashflow> srcCashflow = builder.stream("Cashflow");

        // print to sysout
        //srcVertrag.print(Printed.toSysOut());



        KStream<String, MyValueContainer> joined = srcVertrag.leftJoin(srcCashflow,
                (leftValue, rightValue) -> new MyValueContainer(leftValue , rightValue), /* ValueJoiner */
                JoinWindows.of(600),
                Joined.with(
                  Serdes.String(), /* key */
                  vertragSerde,   /* left value */
                  cashflowSerde)  /* right value */
              );

        joined.to("Output");

        final Topology topology = builder.build();
        System.out.println(topology.describe());

        final KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);

        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);

    }
}

При выполнении выдает ошибку:

2019-06-17 22:18:31.892 ERROR 1599 --- [-StreamThread-1] o.a.k.s.p.i.AssignedStreamsTasks         : stream-thread [streams-pipe-0638d359-94df-43bd-9ef7-eb6769ed8a1c-StreamThread-1] Failed to process stream task 0_0 due to the following error:

java.lang.ClassCastException: java.lang.String cannot be cast to tki.bigdata.domain.Vertrag
    at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:98) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:63) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409) [kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964) [kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832) [kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767) [kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736) [kafka-streams-2.0.1.jar:na]

1 Ответ

1 голос
/ 17 июня 2019

это правильный способ сделать это?

Да.

Должен ли я вообще создавать объекты или, вернее, обрабатывать только строки?

Да. Посмотрите на Avro как на хороший пример формата данных для сериализации / десериализации ваших pojos. Здесь вы ищете Avro "serde" (сериализатор / десериализатор). Например, Confluent предоставляет Avro serde для KStreams (этот serde требует использования реестра Confluent Schema).

что мне делать с приведенным выше результатом?

Мне неясно, каков ваш вопрос.

...