Как я могу применить фильтр в этой схеме avro, используя Apache Flink - PullRequest
2 голосов
/ 25 января 2020

Здравствуйте, ребята, я десериализирую сообщение Avro Kafka, например:

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer("conekta.public.person",
     new KafkaGenericAvroDeserializationSchema("http://localhost:8081"), kafkaProps);

полученное сообщение выглядит так:

{"before": null, "after": {"id": 257, "status": "c4ca4238a0"}, "source": {"version": "0.9.4.Final", "connector": "postgresql", "name": "conekta", "db": "testdb", "ts_usec": 1579909929965704, "txId": 5847, "lsn": 294339488688, "schema": "public", "table": "person", "snapshot": false, "last_snapshot_record": null, "xmin": null}, "op": "c", "ts_ms": 1579909930004}

{"before": null, "after": {"id": 258, "status": "c4ca4238a0"}, "source": {"version": "0.9.4.Final", "connector": "postgresql", "name": "conekta", "db": "testdb", "ts_usec": 1579910374459669, "txId": 5849, "lsn": 294473695272, "schema": "public", "table": "person", "snapshot": false, "last_snapshot_record": null, "xmin": null}, "op": "c", "ts_ms": 1579910374518}

Как я могу применить фильтр к Получите все записи, где идентификатор <258. Мой потребитель Flink. </p>

Я очень новичок в Apache Flink.

Любая помощь будет высоко ценится.

Спасибо!

1 Ответ

1 голос
/ 25 января 2020

Вы можете переопределить boolean isEndOfStream(GenericRecord nextElement) в KafkaGenericAvroDeserializationSchema, чтобы возвращать true, когда вы хотите, чтобы поток заканчивался.

...