Чтение данных из нескольких тем Kafka (общий дизайн класса списка c) - PullRequest
0 голосов
/ 04 февраля 2020

Я пытаюсь изменить код бегущего Flink, чтобы он мог читать данные из нескольких тем Kafka и записывать их в разные папки HDFS соответственно и без объединения. У меня есть много методов Java и Scala generi c и инициации объекта generi c внутри метода основного процесса и отражения. Он работает правильно с одной схемой Avro, но когда я пытаюсь добавить неизвестное количество схем Avro, у меня возникает проблема с обобщениями и конструкциями отражения.

Как решить? Какой шаблон проектирования мне может помочь?

Модель (схема Avro) представлена ​​в Java классах.

    public enum Types implements MessageType {
    RECORD_1("record1", "01", Record1.getClassSchema(), Record1.class),
    RECORD_2("record2", "02", Record2.getClassSchema(), Record2.class);

    private String topicName;
    private String dataType;
    private Schema schema;
    private Class<? extends SpecificRecordBase> clazz;}



public class Record1 extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord 
{
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("???");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
... }

public class Record1 ...

Черта процесса с основными методами процесса.

import org.apache.avro.specific.SpecificRecordBase
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.fs.Writer
import tests.{Record1, Record2, Types}
import scala.reflect.ClassTag

trait Converter[T] extends Serializable {
  def convertToModel(message: KafkaSourceType): T
}

trait FlinkRunner extends Serializable {

  val kafkaTopicToModelMapping: Map[String, Class[_ <: SpecificRecordBase]] =
    Map(
      "record_1" -> Types.RECORD_1.getClassType,
      "record_2" -> Types.RECORD_2.getClassType
    )

  def buildAvroSink1(path: String, writer1: Writer[Record1]): BucketingSink[Record1] = ???
  def buildAvroSink2(path: String, writer2: Writer[Record2]): BucketingSink[Record2] = ???

  def process(topicList: List[String], env: StreamExecutionEnvironment): Unit = {
    // producer kafka source building
    val clazz1: Class[Record1] = ClassTag(kafkaTopicToModelMapping(topicList.head)).runtimeClass.asInstanceOf[Class[Record1]]
    val clazz2: Class[Record2] = ClassTag(kafkaTopicToModelMapping(topicList.tail.head)).runtimeClass.asInstanceOf[Class[Record2]]
    // How to makes clazzes list from that val clazzes: List[Class[???]] = ???

    val avroTypeInfo1: TypeInformation[Record1] = TypeInformation.of(clazz1)
    val avroTypeInfo2: TypeInformation[Record2] = TypeInformation.of(clazz2)
    // How to makes clazzes list from that val avroTypeInfos = ???

    val stream: DataStream[KafkaSourceType] = ???

    // consumer avro paths building, it
    val converter1: Converter[Record1] = new Converter[Record1] {
      override def convertToModel(message: KafkaSourceType): Record1 = deserializeAvro[Record1](message.value)
    }
    val converter2: Converter[Record2] = new Converter[Record2] {
      override def convertToModel(message: KafkaSourceType): Record2 = deserializeAvro[Record2](message.value)
    }
      // How to makes converters list from that

    val outputResultStream1 = stream
      .filter(_.topic == topicList.head)
      .map(record => converter1.convertToModel(record))(avroTypeInfo1)

    val outputResultStream2 = stream
      .filter(_.topic == topicList.tail.head)
      .map(record => converter2.convertToModel(record))(avroTypeInfo2)

    val writer1 = new AvroSinkWriter[Record1](???)
    val writer2 = new AvroSinkWriter[Record2](???)

    // add sink and start process
  }
}

КАК ЕСТЬ В Кафке есть несколько разных тем. Версия Kafka 10.2 без Confluent. Каждая топика Kafka c работает только с одним классом схемы Avro, написанным на Java. Единственное задание Flink (записано в Scala) считывает только одну топи c, конвертирует с одной схемой Avro и записывает данные только в одну папку в HDFS. Имя, путь и имя выходной папки находятся в конфигурации. Например, есть 3 потока работ с параметрами:

Первый поток работ

--brokersAdress … 
--topic record1
--folderName  folder1
-- avroClassName Record1
--output C:/….
--jobName SingleTopic1 
--number_of_parallel 2
--number_of_task 1
--mainClass Runner 
….

Второй поток работ

--brokersAdress … 
--topic record1
--folderName  folder1
-- avroClassName Record1
--output C:/….
--jobName SingleTopic2 
--number_of_parallel 2
--number_of_task 1
--mainClass Runner 
….

Третий поток работ

TO BE Одно задание Flink может читать более одной темы Kafka, преобразовывать ее с использованием другой схемы Avro и записывать данные в разные папки без объединения. Например, я могу запустить только один поток работ, который будет выполнять ту же работу

--brokersAdress … 
--topic record1, record2, record3
--folderName  folder1, folder2, 
-- avroClassName Record1, Record2
--output C:/….
--jobName MultipleTopics 
--number_of_parallel 3
--number_of_task 3
--mainClass Runner
...

Хорошо, спасибо. Есть несколько вопросов об организации кода: 1) Как обобщить переменные в параметрах метода и методов (называемых процедурами), чтобы позволить инициировать Список из нескольких строк с унаследованными от классов SpecificRecordBase? Если возможно, конечно.

val clazz1: Class[Record1] = ClassTag(kafkaTopicToModelMapping(topicList.head)).runtimeClass.asInstanceOf[Class[Record1]]
val clazz2: Class[Record2] = ClassTag(kafkaTopicToModelMapping(topicList.tail.head)).runtimeClass.asInstanceOf[Class[Record2]]

2) Тот же вопрос для avroTypeInfo1, avroTypeInfo2 ..., converter1, converter2, ..., buildAvroSink1, buildAvroSink2, ... .

Также у меня есть вопросы по архитектуре. Я попытался выполнить этот код, и Flink корректно работал с различными темами с классами схемы Avro. Какие инструменты кода Flink могут помочь мне поместить различные классы схемы avro в несколько outputStrems и добавить к ним сток? У вас есть примеры кода с ним?

А также, что я мог бы использовать вместо Flink, чтобы решить проблему с созданием нескольких файлов Avro из разных тем Kafka? Возможно, слияние.

1 Ответ

2 голосов
/ 04 февраля 2020

Я немного растерялся из-за вашей мотивации. Общая идея заключается в том, что если вы хотите использовать обобщенный c подход, go с GenericRecord. Если у вас есть указанный c код для различных типов go SpecificRecord, но тогда не используйте обобщенный c код вокруг него.

Далее, если вам не нужно, постарайтесь сделать все возможное, чтобы не смешивать разные события в одной теме / топологии. Скорее порождайте разные топологии в одном и том же основном для каждого подтипа.

def createTopology[T](topic: String) {
  val stream: DataStream[KafkaSourceType] = 
    env.addSource(new FlinkKafkaConsumer[T](topic, AvroDeserializationSchema.forSpecific(T), properties))
  stream.addSink(StreamingFileSink.forBulkFormat(
    Path.fromLocalFile(folder),
    ParquetAvroWriters.forSpecificRecord(T)))
}
...