У меня есть kafka topi c с тысячами событий в минуту, и я хочу выполнить некоторую обработку, а затем перейти к Kafka. Лог c построен на окне, допустим, 10 минут, и ему нужно писать logi c (groupby, filter et c) только для сообщений, пришедших в течение этих 10 минут.
Я знаю как читать из кафки, но не уверен, как это сделать окно вещь и применить любую настраиваемую функцию для бизнес-логи c.
Может ли кто-нибудь объяснить мне, как для этого в следующем коде?
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
b_s_env = StreamExecutionEnvironment.get_execution_environment()
b_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
b_s_t_env = StreamTableEnvironment.create(b_s_env, environment_settings=b_s_settings)
from pyflink.table.descriptors import Schema, FileSystem, Kafka, Json
b_s_t_env.connect(Kafka()
.version("universal")
.topic("sourcetopic")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "group1")) \
.with_format(Json()) \
.with_schema(Schema()
.field("field1", DataTypes.STRING())
.field("field2", DataTypes.STRING()))\
.create_temporary_table("sourcetable")
b_s_t_env.connect(Kafka()
.version("universal")
.topic("sinktopic")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
.with_format(Json()) \
.with_schema(Schema()
.field("field1", DataTypes.STRING())
.field("field2", DataTypes.STRING()))\
.create_temporary_table("sinktable")
b_s_env.from_path("sourcetable") \
.select("field1,field2")
.insert_into("sinktable")
b_s_env.execute("job_name")