я хочу получать данные о погоде в реальном времени из API openweathmap.org с помощью KafkaProducer - PullRequest
0 голосов
/ 17 октября 2019

Я пытаюсь получить оперативные данные из openweathermap.org API с помощью kafka Producer python. Ниже код работает без ошибок, но не производит вывод. Я не могу увидеть вывод в потребительском терминале Kafka.

Пожалуйста, игнорируйте форматирование, только сосредоточиться на проблеме, это будет большая помощь. Спасибо в Advanced

 #import logging
 import time
 import json
 from kafka import KafkaProducer
 import requests

 kafka_bootstrap_servers = 'broker_ipaddress:9092'
 kafka_topic_name = 'sampletopic'

 producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers,value_serializer=lambda v: json.dumps(v).encode('utf-8'))

 json_message = None
 city_name = None
 temperature = None
 humidity = None
 openweathermap_api_endpoint = None
 appid ="80c0315065d76e0bb5dde5020f70f82c"

 def get_weather_detail(openweathermap_api_endpoint):
     api_response = requests.get(openweathermap_api_endpoint)
     json_data = api_response.json()
     city_name = json_data["name"]
     humidity = json_data["main"] ["humidity"]
     temperature = json_data["main"] ["temp"]
     json_message = {"CityName": city_name, "Temperature": temperature, "Humidity": humidity, "creationTime": time.strftime("%Y-%m-%d %H: %M: %S")}
     return json_message

 def get_appid(appid):

     while True:
         city_name = "New Delhi"
         appid = get_appid(appid)
         openweathermap_api_endpoint = "http://api.openweathermap.org/data/2.5/weather?appid="+appid+"&q="+city_name
         json_message = get_weather_detail(openweathermap_api_endpoint)
         producer.send(kafka_topic_name, json_message)
         print("Published message 1:" + json.dumps(json_message))
         print("Wait for 2 seconds ....")
         time.sleep(2)


         city_name = "Kolkata"
         appid = get_appid(appid)
         openweathermap_api_endpoint = "http://api.openweathermap.org/data/2.5/weather?appid="+appid+"&q="+city_name
         json_message = get_weather_detail(openweathermap_api_endpoint)
         producer.send(kafka_topic_name, json_message)
         print("Published message 2:" + json.dumps(json_message))
         print("Wait for 2 seconds ....")
         time.sleep(2)

         city_name = "Mumbai"
         appid = get_appid(appid)
         openweathermap_api_endpoint = "http://api.openweathermap.org/data/2.5/weather?appid="+appid+"&q="+city_name
         json_message = get_weather_detail(openweathermap_api_endpoint)
         producer.send(kafka_topic_name, json_message)
         print("Published message 3:" + json.dumps(json_message))
         print("Wait for 2 seconds ....")
         time.sleep(2)

         city_name = "Banglore"
         appid = get_appid(appid)
         openweathermap_api_endpoint = "http://api.openweathermap.org/data/2.5/weather?appid="+appid+"&q="+city_name
         json_message = get_weather_detail(openweathermap_api_endpoint)
         producer.send(kafka_topic_name, json_message)
         print("Published message 4:" + json.dumps(json_message))
         print("Wait for 2 seconds ....")
         time.sleep(2)
...