Я пытаюсь получить оперативные данные из openweathermap.org API с помощью kafka Producer python. Ниже код работает без ошибок, но не производит вывод. Я не могу увидеть вывод в потребительском терминале Kafka.
Пожалуйста, игнорируйте форматирование, только сосредоточиться на проблеме, это будет большая помощь. Спасибо в Advanced
#import logging
import time
import json
from kafka import KafkaProducer
import requests
kafka_bootstrap_servers = 'broker_ipaddress:9092'
kafka_topic_name = 'sampletopic'
producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers,value_serializer=lambda v: json.dumps(v).encode('utf-8'))
json_message = None
city_name = None
temperature = None
humidity = None
openweathermap_api_endpoint = None
appid ="80c0315065d76e0bb5dde5020f70f82c"
def get_weather_detail(openweathermap_api_endpoint):
api_response = requests.get(openweathermap_api_endpoint)
json_data = api_response.json()
city_name = json_data["name"]
humidity = json_data["main"] ["humidity"]
temperature = json_data["main"] ["temp"]
json_message = {"CityName": city_name, "Temperature": temperature, "Humidity": humidity, "creationTime": time.strftime("%Y-%m-%d %H: %M: %S")}
return json_message
def get_appid(appid):
while True:
city_name = "New Delhi"
appid = get_appid(appid)
openweathermap_api_endpoint = "http://api.openweathermap.org/data/2.5/weather?appid="+appid+"&q="+city_name
json_message = get_weather_detail(openweathermap_api_endpoint)
producer.send(kafka_topic_name, json_message)
print("Published message 1:" + json.dumps(json_message))
print("Wait for 2 seconds ....")
time.sleep(2)
city_name = "Kolkata"
appid = get_appid(appid)
openweathermap_api_endpoint = "http://api.openweathermap.org/data/2.5/weather?appid="+appid+"&q="+city_name
json_message = get_weather_detail(openweathermap_api_endpoint)
producer.send(kafka_topic_name, json_message)
print("Published message 2:" + json.dumps(json_message))
print("Wait for 2 seconds ....")
time.sleep(2)
city_name = "Mumbai"
appid = get_appid(appid)
openweathermap_api_endpoint = "http://api.openweathermap.org/data/2.5/weather?appid="+appid+"&q="+city_name
json_message = get_weather_detail(openweathermap_api_endpoint)
producer.send(kafka_topic_name, json_message)
print("Published message 3:" + json.dumps(json_message))
print("Wait for 2 seconds ....")
time.sleep(2)
city_name = "Banglore"
appid = get_appid(appid)
openweathermap_api_endpoint = "http://api.openweathermap.org/data/2.5/weather?appid="+appid+"&q="+city_name
json_message = get_weather_detail(openweathermap_api_endpoint)
producer.send(kafka_topic_name, json_message)
print("Published message 4:" + json.dumps(json_message))
print("Wait for 2 seconds ....")
time.sleep(2)