Государственный магазин "Кафка персистентность" не работал с совместным использованием python и java - PullRequest
1 голос
/ 29 февраля 2020

Сегодня я обнаружил очень странную вещь в государственном магазине Кафки в Google, но не нашел причину такого поведения.

Рассмотрим приведенный ниже государственный магазин, написанный в java:

private KeyValueStore<String, GenericRecord> userIdToUserRecord;

Существует два процессора, которые используют это хранилище состояний.

  topology.addStateStore(userIdToUserRecord, ALERT_PROCESSOR_NAME, USER_SETTING_PROCESSOR_NAME)

USER_SETTING_PROCESSOR_NAME поместит данные в хранилище состояний

userIdToUserRecord.put("user-12345", record);

ALERT_PROCESSOR_NAME получит данные из хранилища состояний

userIdToUserRecord.get("user-12345");

Добавление источника в UserSettingProcessor

userSettingTopicName = user-setting-topic;    
topology.addSource(sourceName, userSettingTopicName)
                    .addProcessor(processorName, UserSettingProcessor::new, sourceName);

Добавление источника в AlertEngineProcessor

alertTopicName = alert-topic;
topology.addSource(sourceName, alertTopicName)
                    .addProcessor(processorName, AlertEngineProcessor::new, sourceName);

Случай 1: Создание записи с использованием продукта Kafka в java Первая запись в topi c user-setting-topi c с использованием java добавит запись пользователя в хранилище состояний. Вторая запись в topi c alert-topi *. 1080 * с использованием java будет принимать запись из хранилища состояний с использованием идентификатора пользователя userIdToUserRecord.get ("user-12345");

Работает нормально, я использую kafkaavroproducer для производства запись в обе топи c

Случай 2: Сначала произвести запись в топи c user-setting-topi c с использованием python добавит запись пользователя в хранилище состояний * userIdToUserRecord.put ("user-100", GenericRecord);

Вторая запись записи в topi c alert-topi c с использованием java будет принимать запись из хранилища состояний с использованием идентификатора пользователя userIdToUserRecord.get ("user-100");

странное случается здесь userIdToUserRecord.get (" user-100 ") вернет null

Я проверяю сценарий, подобный этому, также я создаю запись для user-setting-topi c, используя * 1 090 * тогда метод процесса userSettingProcessor вызвал проверку в режиме отладки и пытается получить запись пользователя из хранилища состояний userIdToUserRecord.get ("user-100"), он нормально работал в userSettingProcessor. Я могу получить данные из хранилища состояний

Затем я создаю запись для alert-topi c, используя java, затем пытаюсь получить userIdToUserRecord.get ("user-100"), он вернет null

я не знаю это странное поведение кто-нибудь расскажет мне об этом поведении.

Python код:

value_schema = avro.load('user-setting.avsc')
value = {
    "user-id":"user-12345",
    "client_id":"5cfdd3db-b25a-4e21-a67d-462697096e20",
    "alert_type":"WORK_ORDER_VOLUME"
}

print("------------------------Kafka Producer------------------------------")
avroProducer = AvroProducer(
    {'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8089'},
    default_value_schema=value_schema)
avroProducer.produce(topic="user-setting-topic", value=value)
print("------------------------Sucess Producer------------------------------")
avroProducer.flush() 

Java код:

 Schema schema = new Schema.Parser().parse(schemaString);

        GenericData.Record record = new GenericData.Record(schema);
        record.put("alert_id","5cfdd3db-b25a-4e21-a67d-462697096e20");
        record.put("alert_created_at",123449437L);
        record.put("alert_type","WORK_ORDER_VOLUME");
        record.put("client_id","5cfdd3db-b25a-4e21-a67d-462697096e20");
        //record.put("property_key","property_key-"+i);

        record.put("alert_data","{\"alert_trigger_info\":{\"jll_value\":1.4,\"jll_category\":\"internal\",\"name\":\"trade_Value\",\"current_value\":40,\"calculated_value\":40.1},\"work_order\":{\"locations\":{\"country_name\":\"value\",\"state_province\":\"value\",\"city\":\"value\"},\"property\":{\"name\":\"property name\"}}}");
        return record;

1 Ответ

1 голос
/ 10 марта 2020

Проблема заключается в том, что производитель Java и производитель Python (основанный на производителе C) используют разные функции ha sh по умолчанию для разделения данных. Вам нужно будет предоставить настроенное разделение одному (или обоим), чтобы убедиться, что они используют одну и ту же стратегию разделения.

К сожалению, доза протокола Kafka не указывает, какой должна быть функция разделения по умолчанию ha sh, и таким образом, клиенты могут использовать все, что они хотят по умолчанию.

...