Я пытаюсь реализовать многопоточность внутри класса для подключения к API платформы мониторинга и создания пакетов потоков для нескольких уникальных HTTP-запросов.
Проблема в функции Monitor.load_requests ().
Я выполнил различную отладку, уменьшил количество запросов, обрабатываемых сценарием, до 1 и напечатал имя потока, но я не вижу, что передается в threading.Thread.start ().
import json
import os
import requests
from queue import Queue
import threading
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
var_config_file = 'variables.json'
class LoadSourceFile(object):
"""Obtain the absolute path and open any given filename, open the file and return the data"""
def __init__(self):
"""TBC"""
def open_file(self, name):
"""Open the specified file, if it exists and output the data it contains."""
filename = self.get_full_pathname(name)
if os.path.exists(filename): # If the program can find the JSON file, read it into memory.
with open(name) as var_conf_fh:
data = json.load(var_conf_fh)
return data
else:
print('Error: File / path', filename, 'not found!')
@staticmethod
def get_full_pathname(name):
"""Obtain the absolute path and open any given filename."""
file_path = os.path.abspath(os.path.join('.', name))
return file_path
class Monitor(object):
"""Primary worker function."""
def __init__(self):
"""Read in variables from external file."""
self.load_file = LoadSourceFile()
self.api_vars = self.load_file.open_file(var_config_file) # Obtain variables from variable_config.json.
self.protocol = self.api_vars['protocol']
self.api_method_get = 'get'
self.api_method_post = 'post'
self.uname = self.api_vars['monitor_uname']
self.pword = self.api_vars['monitor_pword']
self.headers = {'App-Key': self.api_vars['monitor_api_key'], 'Content-Type': 'application/json'}
self.monitor_url = self.api_vars['monitor_api_url']
self.monitor_version = self.api_vars['monitor_api_version']
self.monitor_path = self.api_vars['monitor_api_url_path']
self.queuing = Queue()
def construct_url(self):
"""Create the correct monitor API URL format from the variable.json file."""
api_url = self.protocol + self.monitor_url + self.monitor_version + self.monitor_path
return api_url
def check_get_summary(self):
"""Return a summary dictionary of the monitor checks."""
monitor_check_summ_url = self.construct_url() # Get the URL.
monitor_check_summary = self.load_requests(self.api_method_get, [monitor_check_summ_url], self.uname,
self.pword, self.headers)
return monitor_check_summary
@staticmethod
def api_connect(*args):
api_method = args[0]
api_url_list = args[1]
uname = args[2]
pword = args[3]
for api_url in api_url_list:
try:
if args.__len__() == 5 and api_method == 'get':
headers = args[4]
try:
data = requests.get(api_url, auth=HTTPBasicAuth(uname, pword), headers=headers,
verify=False).text
return data
except json.JSONDecodeError as err:
print('Error', err)
else:
data = 'Wrong number of arguments.'
return data
# finally:
# self.queuing.task_done()
except json.JSONDecodeError as err:
print('Error', err)
def load_requests(self, *args):
"""Populate queue with list of requests."""
api_method = args[0]
api_url_list = args[1]
uname = args[2]
pword = args[3]
headers = args[4]
for api_url in api_url_list:
self.queuing.put([api_url])
while not self.queuing.empty():
for t in range(0, 10):
t = threading.Thread(target=self.api_connect(api_method, api_url_list, uname, pword, headers))
t.setDaemon(True)
t.start()
t.join()
self.queuing.join()
def main():
monitor_operations = Monitor()
result = monitor_operations.check_get_summary()
print('Result: ', result)
if __name__ == '__main__':
main()
Список запросов (только один в этом первом экземпляре) должен быть помещен в отдельные потоки, но я получаю ошибку при t.start (), утверждающую, что объект str не может быть вызван.
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 917, in _bootstrap_inner
self.run()
File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
TypeError: 'str' object is not callable```