Я пытаюсь отправить данные ( input_data ) в python, который выглядит примерно так, как показано ниже, чем kafka.
[{
"stateActionTraffic":"1, S1, 1, A1, 20, 0.65
1, S1, 2, A2, 20, 0.65
1, S1, 3, A3, 20, 0.65
2, S2, 1, A1, 25, 0.65
2, S2, 2, A2, 25, 0.65
2, S2, 3, A3, 25, 0.65"
"messageType":"split",
"Id":1232324334
}]
По существу, тип данных input_data - это список
Код Кафки, который я пробовал, следующий для отправки в python (3.x) к Кафке:
import logging
from kafka.errors import KafkaError
logging.basicConfig(level=logging.INFO)
kafka_brokers=['kafka1-1-stagepf.stage-sin.real.com:9092']
producer = KafkaProducer(bootstrap_servers=kafka_brokers,value_serializer=lambda v: json.dumps(v).encode('utf-8'), api_version=(1,4,6))
producer.send('topic', input_data)
producer.flush()
Но при ведении журнала я вижу, что данные вообще не были отправлены со следующимсгенерированный журнал:
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka1-1-stagepf.stage-sin.real.com:9092 <connecting> [IPv4 ('10.14.27.24', 9092)]>: connecting to kafka1-1-stagepf.stage-sin.real.com:9092 [('10.14.27.24', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka1-1-stagepf.stage-sin.real.com:9092 <connecting> [IPv4 ('10.14.27.24', 9092)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection node_id=2 host=10.14.26.71:9092 <connecting> [IPv4 ('10.14.26.71', 9092)]>: connecting to 10.14.26.71:9092 [('10.14.26.71', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=2 host=10.14.26.71:9092 <connecting> [IPv4 ('10.14.26.71', 9092)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka1-1-stagepf.stage-sin.real.com:9092 <connected> [IPv4 ('10.14.27.24', 9092)]>: Closing connection.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka1-1-stagepf.stage-sin.real.com:9092 <connecting> [IPv4 ('10.14.27.24', 9092)]>: connecting to kafka1-1-stagepf.stage-sin.real.com:9092 [('10.14.27.24', 9092) IPv4]
WARNING:kafka.conn:SSL connection closed by server during handshake.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka1-1-stagepf.stage-sin.real.com:9092 <handshake> [IPv4 ('10.14.27.24', 9092)]>: Closing connection. KafkaConnectionError: SSL connection closed by server during handshake
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka1-1-stagepf.stage-sin.real.com:9092 <connecting> [IPv4 ('10.14.27.24', 9092)]>: connecting to kafka1-1-stagepf.stage-sin.real.com:9092 [('10.14.27.24', 9092) IPv4]
WARNING:kafka.conn:SSL connection closed by server during handshake.
Таким образом, данные не отправляются в домен kafka в его тему, как вы можете видеть из файла журнала, где пытается установить соединение, а затем отключается. Он говорит соединение установлено , но затем сразу же появляется сообщение отключено . Не уверен, что здесь происходит. Любая помощь будет оценена.