Сегодня я обнаружил очень странную вещь в государственном магазине Кафки в Google, но не нашел причину такого поведения.
Рассмотрим приведенный ниже государственный магазин, написанный в java:
private KeyValueStore<String, GenericRecord> userIdToUserRecord;
Существует два процессора, которые используют это хранилище состояний.
topology.addStateStore(userIdToUserRecord, ALERT_PROCESSOR_NAME, USER_SETTING_PROCESSOR_NAME)
USER_SETTING_PROCESSOR_NAME поместит данные в хранилище состояний
userIdToUserRecord.put("user-12345", record);
ALERT_PROCESSOR_NAME получит данные из хранилища состояний
userIdToUserRecord.get("user-12345");
Добавление источника в UserSettingProcessor
userSettingTopicName = user-setting-topic;
topology.addSource(sourceName, userSettingTopicName)
.addProcessor(processorName, UserSettingProcessor::new, sourceName);
Добавление источника в AlertEngineProcessor
alertTopicName = alert-topic;
topology.addSource(sourceName, alertTopicName)
.addProcessor(processorName, AlertEngineProcessor::new, sourceName);
Случай 1: Создание записи с использованием продукта Kafka в java Первая запись в topi c user-setting-topi c с использованием java добавит запись пользователя в хранилище состояний. Вторая запись в topi c alert-topi *. 1080 * с использованием java будет принимать запись из хранилища состояний с использованием идентификатора пользователя userIdToUserRecord.get ("user-12345");
Работает нормально, я использую kafkaavroproducer для производства запись в обе топи c
Случай 2: Сначала произвести запись в топи c user-setting-topi c с использованием python добавит запись пользователя в хранилище состояний * userIdToUserRecord.put ("user-100", GenericRecord);
Вторая запись записи в topi c alert-topi c с использованием java будет принимать запись из хранилища состояний с использованием идентификатора пользователя userIdToUserRecord.get ("user-100");
странное случается здесь userIdToUserRecord.get (" user-100 ") вернет null
Я проверяю сценарий, подобный этому, также я создаю запись для user-setting-topi c, используя * 1 090 * тогда метод процесса userSettingProcessor вызвал проверку в режиме отладки и пытается получить запись пользователя из хранилища состояний userIdToUserRecord.get ("user-100"), он нормально работал в userSettingProcessor. Я могу получить данные из хранилища состояний
Затем я создаю запись для alert-topi c, используя java, затем пытаюсь получить userIdToUserRecord.get ("user-100"), он вернет null
я не знаю это странное поведение кто-нибудь расскажет мне об этом поведении.
Python код:
value_schema = avro.load('user-setting.avsc')
value = {
"user-id":"user-12345",
"client_id":"5cfdd3db-b25a-4e21-a67d-462697096e20",
"alert_type":"WORK_ORDER_VOLUME"
}
print("------------------------Kafka Producer------------------------------")
avroProducer = AvroProducer(
{'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8089'},
default_value_schema=value_schema)
avroProducer.produce(topic="user-setting-topic", value=value)
print("------------------------Sucess Producer------------------------------")
avroProducer.flush()
Java код:
Schema schema = new Schema.Parser().parse(schemaString);
GenericData.Record record = new GenericData.Record(schema);
record.put("alert_id","5cfdd3db-b25a-4e21-a67d-462697096e20");
record.put("alert_created_at",123449437L);
record.put("alert_type","WORK_ORDER_VOLUME");
record.put("client_id","5cfdd3db-b25a-4e21-a67d-462697096e20");
//record.put("property_key","property_key-"+i);
record.put("alert_data","{\"alert_trigger_info\":{\"jll_value\":1.4,\"jll_category\":\"internal\",\"name\":\"trade_Value\",\"current_value\":40,\"calculated_value\":40.1},\"work_order\":{\"locations\":{\"country_name\":\"value\",\"state_province\":\"value\",\"city\":\"value\"},\"property\":{\"name\":\"property name\"}}}");
return record;