Обработка сообщений Kafka через Flink SQL Python - PullRequest
0 голосов
/ 12 июля 2020

У меня есть kafka topi c с тысячами событий в минуту, и я хочу выполнить некоторую обработку, а затем перейти к Kafka. Лог c построен на окне, допустим, 10 минут, и ему нужно писать logi c (groupby, filter et c) только для сообщений, пришедших в течение этих 10 минут.

Я знаю как читать из кафки, но не уверен, как это сделать окно вещь и применить любую настраиваемую функцию для бизнес-логи c.

Может ли кто-нибудь объяснить мне, как для этого в следующем коде?

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes

b_s_env = StreamExecutionEnvironment.get_execution_environment()
b_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
b_s_t_env = StreamTableEnvironment.create(b_s_env, environment_settings=b_s_settings)

from pyflink.table.descriptors import Schema, FileSystem, Kafka, Json

b_s_t_env.connect(Kafka()
                 .version("universal")
                 .topic("sourcetopic")
                 .property("zookeeper.connect", "localhost:2181")
                 .property("bootstrap.servers", "localhost:9092")
                 .property("group.id", "group1")) \
         .with_format(Json()) \
         .with_schema(Schema()
                     .field("field1", DataTypes.STRING())
                     .field("field2", DataTypes.STRING()))\
         .create_temporary_table("sourcetable")

b_s_t_env.connect(Kafka()
                 .version("universal")
                 .topic("sinktopic")
                 .property("zookeeper.connect", "localhost:2181")
                 .property("bootstrap.servers", "localhost:9092")
         .with_format(Json()) \
         .with_schema(Schema()
                     .field("field1", DataTypes.STRING())
                     .field("field2", DataTypes.STRING()))\
         .create_temporary_table("sinktable")

b_s_env.from_path("sourcetable") \
      .select("field1,field2")
      .insert_into("sinktable")
      
b_s_env.execute("job_name")

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...