Я настроил tweepy для получения твитов и записи в topi c TWEEPY_TOPI C и потока для чтения из topi c.
-- Create topic for tweepy to write into
CREATE STREAM TWEEPY_STREAM (
id BIGINT,
lang VARCHAR,
tweet VARCHAR,
user STRUCT<id BIGINT,
screen_name VARCHAR>)
WITH (
KAFKA_TOPIC= 'TWEEPY_TOPIC',
VALUE_FORMAT = 'AVRO'
);
Есть еще один поток, который читает из вышеуказанного потока и записывает его в другой topi c (который передается в поиск elasti c с помощью kafka-connect).
-- Create another topic with ML data.
-- GETSENTIMENT and GETFOURCLASS are custom ksql functions
CREATE STREAM ELASTIC_STREAM
WITH (
KAFKA_TOPIC = 'ELASTIC_TOPIC',
VALUE_FORMAT = 'AVRO',
PARTITIONS = 1, REPLICAS = 1
)
AS SELECT
id,
lang,
tweet,
user,
GETSENTIMENT(tweet) as sentiment,
GETFOURCLASS(tweet) as fourclass
FROM TWEEPY_STREAM;
Пользовательские функции GETSENTIMENT
и GETFOURCLASS
сделать POST-запрос к серверу модели python, который возвращает классификацию. Этот ответ API в настоящее время занимает около 0,5–1 секунды.
Меня беспокоит то, что если данные в первом топи c TWEEPY_TOPIC
будут удалены после периода хранения по умолчанию (7 дней), они не будут вас заберет ELASTIC_STREAM
. Есть ли способ установить какой-то флаг, чтобы kafka не удалял данные, которые еще не были обработаны? Я также открыт для предложений по редизайну.