Как восстановить потоковые данные, даже если соединение потеряно с клиентом Python Elasticsearch? - PullRequest
1 голос
/ 16 июня 2020

Я транслирую данные RESTful API из https://www.n2yo.com/api/, которые используются для отслеживания местоположения спутников. Я использую клиент python с Elasticsearch. Я сохраняю потоковые данные в ES каждые 10 секунд и визуализирую Kibana. Мой ES vesrion - 6.4.3

Мой код:

URL = "https://www.n2yo.com/rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= your key"



es = Elasticsearch('http://ip:port',timeout=600)

settings = { "settings": {
                 "number_of_shards":1,
                  'number_of_replicas':0
                 },
      "mappings" : { 
           "document" : {
                "properties":{
                    "geo": {
                       "type": "geo_point"
                            }
                          }
                        } 
                     } 
                  }
try:
 es.indices.create(index = "spacestation", body=settings)
except RequestError as es1:
 print('Index already exists!!')
 sys.exit(1)

def collect_data():
  data = requests.get(url = URL).json() 
  del data['positions'][1]
  new_data = {'geo':{'lat':data['positions'][0]['satlatitude'],
               'lon':data['positions'][0]['satlongitude']}, 
                'satname': data['info']['satname'], 'satid': data['info']['satid'], 
                  'timestamp':datetime.fromtimestamp(data['positions'][0]['timestamp']).isoformat()        
              }

  es.index(index='spacestation', doc_type='document', body=new_data)

schedule.every(10).seconds.do(collect_data)

while True:
  schedule.run_pending()
  time.sleep(1) 

Мой вопрос: вчера я потерял соединение. Ошибка выглядит следующим образом:

requests.exceptions.ConnectionError: HTTPSConnectionPool (host = 'www.n2yo.com', port = 443): Превышено максимальное количество повторных попыток с URL: / rest / v1 / satellite / plays /25544/41.702/-76.014/0/2/&apiKey= (Вызвано NewConnectionError (': Не удалось установить sh новое соединение: [Errno -3] Временный сбой в разрешении имен',))

Когда я повторно запускаю свой код, я не могу, потому что индекс уже существует. Если я удалю индекс, я потеряю свои данные, которые уже находятся в ES. Что я могу сделать? Мне нужно сохранить мои сохраненные данные, и мне нужно запустить задание с этого момента. Какие-нибудь решения, пожалуйста?

1 Ответ

1 голос
/ 16 июня 2020

Просто создайте индекс, только если вы получаете данные с n2yo.com. вы должны использовать функцию es.indices.exists. Затем вы делаете свою функцию collect_data рекурсивной в случае сбоя. Попробуйте:

 URL = "https://www.n2yo.com/rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= your key"



es = Elasticsearch('http://ip:port',timeout=600)

def create_index()

    if not es.indices.exists(index = "spacestation"):

        settings = { "settings": {
                 "number_of_shards":1,
                  'number_of_replicas':0
                 },
      "mappings" : { 
           "document" : {
                "properties":{
                    "geo": {
                       "type": "geo_point"
                            }
                          }
                        } 
                     } 
                  }
          es.indices.create(index = "spacestation", body=settings)
    else:
        print('Index already exists!!')


def collect_data():
  try:
      data = requests.get(url = URL).json()
      create_index() 
      del data['positions'][1]
      new_data = {'geo':{'lat':data['positions'][0]['satlatitude'],
               'lon':data['positions'][0]['satlongitude']}, 
                'satname': data['info']['satname'], 'satid': data['info']['satid'], 
                  'timestamp':datetime.fromtimestamp(data['positions'][0]['timestamp']).isoformat()        
              }

      es.index(index='spacestation', doc_type='document', body=new_data)
    except:
        collect_data()

schedule.every(10).seconds.do(collect_data)

while True:
  schedule.run_pending()
  time.sleep(1) 
...