Я пытаюсь изменить код бегущего Flink, чтобы он мог читать данные из нескольких тем Kafka и записывать их в разные папки HDFS соответственно и без объединения. У меня есть много методов Java и Scala generi c и инициации объекта generi c внутри метода основного процесса и отражения. Он работает правильно с одной схемой Avro, но когда я пытаюсь добавить неизвестное количество схем Avro, у меня возникает проблема с обобщениями и конструкциями отражения.
Как решить? Какой шаблон проектирования мне может помочь?
Модель (схема Avro) представлена в Java классах.
public enum Types implements MessageType {
RECORD_1("record1", "01", Record1.getClassSchema(), Record1.class),
RECORD_2("record2", "02", Record2.getClassSchema(), Record2.class);
private String topicName;
private String dataType;
private Schema schema;
private Class<? extends SpecificRecordBase> clazz;}
public class Record1 extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord
{
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("???");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
... }
public class Record1 ...
Черта процесса с основными методами процесса.
import org.apache.avro.specific.SpecificRecordBase
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.fs.Writer
import tests.{Record1, Record2, Types}
import scala.reflect.ClassTag
trait Converter[T] extends Serializable {
def convertToModel(message: KafkaSourceType): T
}
trait FlinkRunner extends Serializable {
val kafkaTopicToModelMapping: Map[String, Class[_ <: SpecificRecordBase]] =
Map(
"record_1" -> Types.RECORD_1.getClassType,
"record_2" -> Types.RECORD_2.getClassType
)
def buildAvroSink1(path: String, writer1: Writer[Record1]): BucketingSink[Record1] = ???
def buildAvroSink2(path: String, writer2: Writer[Record2]): BucketingSink[Record2] = ???
def process(topicList: List[String], env: StreamExecutionEnvironment): Unit = {
// producer kafka source building
val clazz1: Class[Record1] = ClassTag(kafkaTopicToModelMapping(topicList.head)).runtimeClass.asInstanceOf[Class[Record1]]
val clazz2: Class[Record2] = ClassTag(kafkaTopicToModelMapping(topicList.tail.head)).runtimeClass.asInstanceOf[Class[Record2]]
// How to makes clazzes list from that val clazzes: List[Class[???]] = ???
val avroTypeInfo1: TypeInformation[Record1] = TypeInformation.of(clazz1)
val avroTypeInfo2: TypeInformation[Record2] = TypeInformation.of(clazz2)
// How to makes clazzes list from that val avroTypeInfos = ???
val stream: DataStream[KafkaSourceType] = ???
// consumer avro paths building, it
val converter1: Converter[Record1] = new Converter[Record1] {
override def convertToModel(message: KafkaSourceType): Record1 = deserializeAvro[Record1](message.value)
}
val converter2: Converter[Record2] = new Converter[Record2] {
override def convertToModel(message: KafkaSourceType): Record2 = deserializeAvro[Record2](message.value)
}
// How to makes converters list from that
val outputResultStream1 = stream
.filter(_.topic == topicList.head)
.map(record => converter1.convertToModel(record))(avroTypeInfo1)
val outputResultStream2 = stream
.filter(_.topic == topicList.tail.head)
.map(record => converter2.convertToModel(record))(avroTypeInfo2)
val writer1 = new AvroSinkWriter[Record1](???)
val writer2 = new AvroSinkWriter[Record2](???)
// add sink and start process
}
}
КАК ЕСТЬ В Кафке есть несколько разных тем. Версия Kafka 10.2 без Confluent. Каждая топика Kafka c работает только с одним классом схемы Avro, написанным на Java. Единственное задание Flink (записано в Scala) считывает только одну топи c, конвертирует с одной схемой Avro и записывает данные только в одну папку в HDFS. Имя, путь и имя выходной папки находятся в конфигурации. Например, есть 3 потока работ с параметрами:
Первый поток работ
--brokersAdress …
--topic record1
--folderName folder1
-- avroClassName Record1
--output C:/….
--jobName SingleTopic1
--number_of_parallel 2
--number_of_task 1
--mainClass Runner
….
Второй поток работ
--brokersAdress …
--topic record1
--folderName folder1
-- avroClassName Record1
--output C:/….
--jobName SingleTopic2
--number_of_parallel 2
--number_of_task 1
--mainClass Runner
….
Третий поток работ
…
TO BE Одно задание Flink может читать более одной темы Kafka, преобразовывать ее с использованием другой схемы Avro и записывать данные в разные папки без объединения. Например, я могу запустить только один поток работ, который будет выполнять ту же работу
--brokersAdress …
--topic record1, record2, record3
--folderName folder1, folder2,
-- avroClassName Record1, Record2
--output C:/….
--jobName MultipleTopics
--number_of_parallel 3
--number_of_task 3
--mainClass Runner
...
Хорошо, спасибо. Есть несколько вопросов об организации кода: 1) Как обобщить переменные в параметрах метода и методов (называемых процедурами), чтобы позволить инициировать Список из нескольких строк с унаследованными от классов SpecificRecordBase? Если возможно, конечно.
val clazz1: Class[Record1] = ClassTag(kafkaTopicToModelMapping(topicList.head)).runtimeClass.asInstanceOf[Class[Record1]]
val clazz2: Class[Record2] = ClassTag(kafkaTopicToModelMapping(topicList.tail.head)).runtimeClass.asInstanceOf[Class[Record2]]
2) Тот же вопрос для avroTypeInfo1, avroTypeInfo2 ..., converter1, converter2, ..., buildAvroSink1, buildAvroSink2, ... .
Также у меня есть вопросы по архитектуре. Я попытался выполнить этот код, и Flink корректно работал с различными темами с классами схемы Avro. Какие инструменты кода Flink могут помочь мне поместить различные классы схемы avro в несколько outputStrems и добавить к ним сток? У вас есть примеры кода с ним?
А также, что я мог бы использовать вместо Flink, чтобы решить проблему с созданием нескольких файлов Avro из разных тем Kafka? Возможно, слияние.