Я пытаюсь создать бота для телеграмм, который будет вести мониторинг. Я провел некоторые исследования и эксперименты и думаю, что правильным способом для этого является использование потоков. В настоящее время я сделал более или менее работающий прототип, код ниже. И столкнулся со следующими проблемами, которые, надеюсь, вы можете помочь решить.
Много времени я потратил, чтобы дать возможность правильно запустить и остановить мониторинг потока. И теперь я использую переменную Thread как глобальную, чтобы изменить ее статус. Это правильно, используя в моем коде? Может быть, есть лучшее решение для этого?
Хотя это маловероятный случай, но я бы хотел ограничить поток только одним istance - чтобы он не дублировал. Так как весь мониторинг будет проходить в одном потоке. Может ли кто-нибудь помочь мне с этим?
Может быть, есть какие-то другие критические проблемы в коде, которые болят глаза?
#!/usr/bin/python3
from telegram.ext import Updater, CommandHandler, Filters, MessageHandler, CallbackQueryHandler, PicklePersistence
from telegram import ReplyKeyboardMarkup, InlineKeyboardButton, InlineKeyboardMarkup, KeyboardButton
from telegram.error import NetworkError, Unauthorized, TimedOut
import logging
import os
from datetime import timedelta
import pickle
import numpy as np
import requests
import time
import json
import threading
from threading import Thread
dn = os.path.dirname(os.path.realpath(__file__))
# Token for telegram bot
TOKEN=xxx
# Pickle to store unique users
USERS_PICKLE = os.path.join(dn,"users.pkl")
# Pickle to store megarmarket and ashan notification users
SOME_USERS_PICKLE = os.path.join(dn,"some_users.pkl")
# Enable logging
logger = logging.getLogger(__name__)
###...etc
#######################################################
# Class for monitoring thread
class Monitoring(Thread):
def __init__(self, updater):
super(Monitoring, self).__init__()
self.running = True
self.updater = updater
def run(self):
while self.running:
# some monitoring - get request, check result.. etc. if Ok - send message.
try:
with open(SOME_USERS_PICKLE, 'rb') as f:
some_registered_users = pickle.load(f)
except:
some_registered_users = {}
del_plan = get_result()
status = check_status(del_plan)
if status[0]:
for usr in some_registered_users.keys():
try:
self.updater.bot.send_message(chat_id=usr, text="XXX smthng{},{}".format(status[1],status[2]), disable_web_page_preview=True)
except Unauthorized:
logger.info("User have blocked bot: {},{}".format(usr, check_status_stores[usr]))
except TimedOut:
logger.info("Message sending timed out..")
else:
for usr in some_registered_users.keys():
try:
self.updater.bot.send_message(chat_id=usr, text="?")
except Unauthorized:
logger.info("User blocked bot: {},{}".format(usr, check_status_stores[usr]))
except TimedOut:
logger.info("Message sending timed out..")
# some another group monitoring..
try:
with open(SOME_USERS_PICKLE2, 'rb') as f:
some_registered_users = pickle.load(f)
except:
some_registered_users = {}
del_plan = get_result_2()
status = check_status_stores(del_plan)
if status[0]:
for usr in some_registered_users.keys():
try:
self.updater.bot.send_message(chat_id=usr, text="XXX smthng{},{}".format(status[1],status[2]), disable_web_page_preview=True)
except Unauthorized:
logger.info("User have blocked bot: {},{}".format(usr, check_status_stores[usr]))
except TimedOut:
logger.info("Message sending timed out..")
else:
for usr in some_registered_users.keys():
try:
self.updater.bot.send_message(chat_id=usr, text="?")
except Unauthorized:
logger.info("User blocked bot: {},{}".format(usr, check_status_stores[usr]))
except TimedOut:
logger.info("Message sending timed out..")
##############################################################
# Define command handlers
def start(update, context):
logger.info("User {} started bot".format(update.effective_user["id"])) #update.message.chat_id
update.message.reply_text('Hi, {}'.format(update.message.from_user.first_name))
#custom keyborad
custom_keyboard = [['/start', '/help'],['/monitor_']] #'
reply_markup = ReplyKeyboardMarkup(keyboard=custom_keyboard, resize_keyboard=True, one_time_keyboard=True)
context.bot.send_message(chat_id=update.effective_chat.id,
text="Select '/monitor_'",
reply_markup=reply_markup)
# save unique users to pickle (open existing)
try:
with open(USERS_PICKLE, 'rb') as f:
registered_users = pickle.load(f)
except:
registered_users = {}
registered_users[update.effective_chat.id] = (update.effective_user["first_name"], update.effective_user["last_name"])
with open(USERS_PICKLE, 'wb') as f:
pickle.dump(registered_users, f)
logger.info("user dict: {}".format(registered_users))
# Stop monitoring Thread
def stop_monitoring(update, context):
global monitoring
update.message.reply_text('End monitoring...')
monitoring.running = False # stop the thread
# Get some data from monitoring Thread
# def get(update, context, t=monitoring):
# update.message.reply_text('OK, the current value is {}'.format(t.data))
def help(update, context):
context.bot.send_message(chat_id=update.effective_chat.id,
text=""" bla-bla""")
def select_(update, context):
a_list = ['A', 'B', 'C', 'D']
inline_kb=[]
crossIcon = u"\u274C"
checkIcon = u"\u2705"
for s in a_list:
inline_kb.append([InlineKeyboardButton(text=checkIcon+" "+s, callback_data="monitoring_ "+s), InlineKeyboardButton(text=crossIcon+" Unsubscribe "+s, callback_data="unsubscribe_ "+s)])
reply_markup = InlineKeyboardMarkup(inline_kb)
update.message.reply_text('Оберіть магазин:', reply_markup=reply_markup)
def register_monitoring_user(update, context):
logger.info("User {} registered for {}".format(update.effective_user["id"],update.callback_query.data))
query = update.callback_query
code = query.data.split(' ')[-1]
context.bot.answer_callback_query(query.id, "You are registering for {}".format(code))
#query.edit_message_text(text="Prediction for: {}".format(query.data))
if code=='A':
# save users to pickle (open existing)
try:
with open(SOME_USERS_PICKLE, 'rb') as f:
registered_users = pickle.load(f)
except:
registered_users = {}
registered_users[update.effective_chat.id] = (update.effective_user["first_name"], update.effective_user["last_name"])
with open(SOME_USERS_PICKLE, 'wb') as f:
pickle.dump(registered_users, f)
logger.info("A user dict: {}".format(registered_users))
elif code=='B':
#.......
context.bot.send_message(chat_id=update.effective_chat.id, text="?...".format(code))
def unsubscribe_monitoring_user(update, context):
logger.info("User {} unsubscribed from {}".format(update.effective_user["id"],update.callback_query.data))
query = update.callback_query
code = query.data.split(' ')[-1]
context.bot.answer_callback_query(query.id, "You are unsubscribing..{}".format(code))
if code=='A':
# save users to pickle (open existing)
try:
with open(SOME_USERS_PICKLE, 'rb') as f:
registered_users = pickle.load(f)
registered_users.pop(update.effective_chat.id, None)
except:
registered_users = {}
with open(SOME_USERS_PICKLE, 'wb') as f:
pickle.dump(registered_users, f)
logger.info("A user dict: {}".format(registered_users))
elif code=='B':
#.....
context.bot.send_message(chat_id=update.effective_chat.id, text="Unsubscribed..".format(code))
def unknown(update, context):
context.bot.send_message(chat_id=update.effective_chat.id, text="WTF "+update.message.text+"? ?")
def error(update, context):
"""Log Errors caused by Updates."""
logger.warning('Update "%s" caused error "%s"', update, context.error)
############ request processing
def get_result():
url = "google.com"
headers = {"authority":"xxx"
,"path":"/xxx3"
,"origin":"xxxx"
,"referer":"xxx"
,"user-agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36"
,"x-chain":"yyy"}
try:
response = requests.get(url, headers=headers)
except requests.exceptions.ConnectionError:
print("Connection refused")
return False
# print the response status code
print(response.status_code)
if response.status_code==200:
try:
return response.json()
except json.decoder.JSONDecodeError:
print("Response Not JSON: {}".format(response.text))
return False
except Exception as e:
logging.error(e)
return False
else:
logger.info("Error in response: {}".format(response.status_code))
return False
def get_result_2():
url = "google.com"
headers = {"authority":"xxx"
,"path":"/xxx3"
,"origin":"xxxx"
,"referer":"xxx"
,"user-agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36"
,"x-chain":"yyy"}
try:
response = requests.get(url, headers=headers)
except requests.exceptions.ConnectionError:
print("Connection refused")
return False
# print the response status code
print(response.status_code)
if response.status_code==200:
try:
return response.json()
except json.decoder.JSONDecodeError:
print("Response Not JSON: {}".format(response.text))
return False
except Exception as e:
logging.error(e)
return False
else:
logger.info("Error in response: {}".format(response.status_code))
return False
def check_status(json):
if json==False:
return(False, False, False)
else:
status = None
for day in json:
for slot in day[list(day.keys())[1]]:
if slot['is_open']:
status = True
date = slot['date']
time = slot['time_range']
return(status, date, time)
return(False, False, False)
############
def main():
# Register handlers in Dispatcher
start_handler = CommandHandler('start', start) # start handler
updater.dispatcher.add_handler(start_handler) # register start handler
updater.dispatcher.add_handler(CommandHandler('help', help))
updater.dispatcher.add_handler(CommandHandler('monitor_', select_))
updater.dispatcher.add_handler(CommandHandler('stop_monitoring', stop_monitoring))
updater.dispatcher.add_handler(CallbackQueryHandler(register_monitoring_user, pattern=r'^monitoring_ '))
updater.dispatcher.add_handler(CallbackQueryHandler(unsubscribe_monitoring_user, pattern=r'^unsubscribe_ '))
#
updater.dispatcher.add_handler(MessageHandler(Filters.command, unknown))
#error logging handler
updater.dispatcher.add_error_handler(error)
# Run bot
updater.start_polling()
# Start monitoring in thread
global monitoring
monitoring = Monitoring(updater) # create parallel thread
monitoring.daemon = True # stop the thread if the main thread quits
monitoring.start()
print(threading.active_count())
# Stop the Bot when Ctrl+C received
updater.idle()
if __name__ == '__main__':
logger.info("Starting bot")
main()