Бот Telegram с потоками - лимит потоков с одним экземпляром; правильный запуск / остановка потока - PullRequest
0 голосов
/ 29 марта 2020

Я пытаюсь создать бота для телеграмм, который будет вести мониторинг. Я провел некоторые исследования и эксперименты и думаю, что правильным способом для этого является использование потоков. В настоящее время я сделал более или менее работающий прототип, код ниже. И столкнулся со следующими проблемами, которые, надеюсь, вы можете помочь решить.

  1. Много времени я потратил, чтобы дать возможность правильно запустить и остановить мониторинг потока. И теперь я использую переменную Thread как глобальную, чтобы изменить ее статус. Это правильно, используя в моем коде? Может быть, есть лучшее решение для этого?

  2. Хотя это маловероятный случай, но я бы хотел ограничить поток только одним istance - чтобы он не дублировал. Так как весь мониторинг будет проходить в одном потоке. Может ли кто-нибудь помочь мне с этим?

  3. Может быть, есть какие-то другие критические проблемы в коде, которые болят глаза?

#!/usr/bin/python3
from telegram.ext import Updater, CommandHandler, Filters, MessageHandler, CallbackQueryHandler, PicklePersistence
from telegram import ReplyKeyboardMarkup, InlineKeyboardButton, InlineKeyboardMarkup, KeyboardButton
from telegram.error import NetworkError, Unauthorized, TimedOut
import logging
import os
from datetime import timedelta
import pickle
import numpy as np
import requests
import time
import json
import threading
from threading import Thread

dn = os.path.dirname(os.path.realpath(__file__))


# Token for telegram bot
TOKEN=xxx

# Pickle to store unique users
USERS_PICKLE = os.path.join(dn,"users.pkl")
# Pickle to store megarmarket and ashan notification users
SOME_USERS_PICKLE = os.path.join(dn,"some_users.pkl")

# Enable logging
logger = logging.getLogger(__name__) 
###...etc

#######################################################
# Class for monitoring thread
class Monitoring(Thread):
    def __init__(self, updater):
        super(Monitoring, self).__init__()
        self.running = True
        self.updater = updater

    def run(self):
        while self.running:
            # some monitoring - get request, check result.. etc. if Ok  - send message.
            try:
                with open(SOME_USERS_PICKLE, 'rb') as f:
                        some_registered_users = pickle.load(f)
            except:
                some_registered_users = {}

            del_plan = get_result()
            status = check_status(del_plan)
            if status[0]:
                for usr in some_registered_users.keys():
                    try:
                        self.updater.bot.send_message(chat_id=usr, text="XXX smthng{},{}".format(status[1],status[2]), disable_web_page_preview=True)
                    except Unauthorized:
                            logger.info("User have blocked bot: {},{}".format(usr, check_status_stores[usr])) 
                    except TimedOut:
                            logger.info("Message sending timed out..")                         
            else:
                for usr in some_registered_users.keys():
                    try:
                        self.updater.bot.send_message(chat_id=usr, text="?")
                    except Unauthorized:
                            logger.info("User blocked bot: {},{}".format(usr, check_status_stores[usr])) 
                    except TimedOut:
                            logger.info("Message sending timed out..") 

            # some another group monitoring..
            try:
                with open(SOME_USERS_PICKLE2, 'rb') as f:
                        some_registered_users = pickle.load(f)
            except:
                some_registered_users = {}

            del_plan = get_result_2()
            status = check_status_stores(del_plan)
            if status[0]:
                for usr in some_registered_users.keys():
                    try:
                        self.updater.bot.send_message(chat_id=usr, text="XXX smthng{},{}".format(status[1],status[2]), disable_web_page_preview=True)
                    except Unauthorized:
                            logger.info("User have blocked bot: {},{}".format(usr, check_status_stores[usr])) 
                    except TimedOut:
                            logger.info("Message sending timed out..")                         
            else:
                for usr in some_registered_users.keys():
                    try:
                        self.updater.bot.send_message(chat_id=usr, text="?")
                    except Unauthorized:
                            logger.info("User blocked bot: {},{}".format(usr, check_status_stores[usr])) 
                    except TimedOut:
                            logger.info("Message sending timed out..") 

##############################################################     



# Define command handlers
def start(update, context):
    logger.info("User {} started bot".format(update.effective_user["id"])) #update.message.chat_id
    update.message.reply_text('Hi, {}'.format(update.message.from_user.first_name))
    #custom keyborad
    custom_keyboard = [['/start', '/help'],['/monitor_']]  #'
    reply_markup = ReplyKeyboardMarkup(keyboard=custom_keyboard, resize_keyboard=True, one_time_keyboard=True)
    context.bot.send_message(chat_id=update.effective_chat.id,
                     text="Select '/monitor_'",
                     reply_markup=reply_markup)
    # save unique users to pickle (open existing)
    try:
        with open(USERS_PICKLE, 'rb') as f:
                registered_users = pickle.load(f)
    except:
        registered_users = {}
    registered_users[update.effective_chat.id] = (update.effective_user["first_name"], update.effective_user["last_name"])
    with open(USERS_PICKLE, 'wb') as f:
        pickle.dump(registered_users, f)
    logger.info("user dict: {}".format(registered_users))



# Stop monitoring Thread        
def stop_monitoring(update, context): 
    global monitoring  
    update.message.reply_text('End monitoring...')
    monitoring.running = False # stop the thread

# Get some data from monitoring Thread
# def get(update, context, t=monitoring):
#     update.message.reply_text('OK, the current value is {}'.format(t.data))


def help(update, context):
    context.bot.send_message(chat_id=update.effective_chat.id,
                            text=""" bla-bla""")



def select_(update, context):
    a_list = ['A', 'B', 'C', 'D']       
    inline_kb=[]
    crossIcon = u"\u274C"
    checkIcon = u"\u2705" 
    for s in a_list:
        inline_kb.append([InlineKeyboardButton(text=checkIcon+" "+s, callback_data="monitoring_ "+s), InlineKeyboardButton(text=crossIcon+" Unsubscribe "+s, callback_data="unsubscribe_ "+s)])       
    reply_markup = InlineKeyboardMarkup(inline_kb)
    update.message.reply_text('Оберіть магазин:', reply_markup=reply_markup)    

def register_monitoring_user(update, context):
    logger.info("User {} registered for {}".format(update.effective_user["id"],update.callback_query.data)) 
    query = update.callback_query
    code = query.data.split(' ')[-1]
    context.bot.answer_callback_query(query.id, "You are registering for {}".format(code))
    #query.edit_message_text(text="Prediction for: {}".format(query.data))
    if code=='A':
        # save users to pickle (open existing)
        try:
            with open(SOME_USERS_PICKLE, 'rb') as f:
                    registered_users = pickle.load(f)
        except:
            registered_users = {}
        registered_users[update.effective_chat.id] = (update.effective_user["first_name"], update.effective_user["last_name"])
        with open(SOME_USERS_PICKLE, 'wb') as f:
            pickle.dump(registered_users, f)
        logger.info("A user dict: {}".format(registered_users)) 

    elif code=='B':
    #.......

    context.bot.send_message(chat_id=update.effective_chat.id, text="?...".format(code))  


def unsubscribe_monitoring_user(update, context):
    logger.info("User {} unsubscribed from {}".format(update.effective_user["id"],update.callback_query.data)) 
    query = update.callback_query
    code = query.data.split(' ')[-1]
    context.bot.answer_callback_query(query.id, "You are unsubscribing..{}".format(code))
    if code=='A':
        # save users to pickle (open existing)
        try:
            with open(SOME_USERS_PICKLE, 'rb') as f:
                    registered_users = pickle.load(f)
                    registered_users.pop(update.effective_chat.id, None)
        except:
            registered_users = {}
        with open(SOME_USERS_PICKLE, 'wb') as f:
            pickle.dump(registered_users, f)
        logger.info("A user dict: {}".format(registered_users)) 

    elif code=='B':
    #.....
    context.bot.send_message(chat_id=update.effective_chat.id, text="Unsubscribed..".format(code))  


def unknown(update, context):
    context.bot.send_message(chat_id=update.effective_chat.id, text="WTF "+update.message.text+"? ?")
def error(update, context):
    """Log Errors caused by Updates."""
    logger.warning('Update "%s" caused error "%s"', update, context.error)



############ request processing
def get_result():
    url = "google.com"
    headers = {"authority":"xxx"
               ,"path":"/xxx3"
               ,"origin":"xxxx"
               ,"referer":"xxx"
               ,"user-agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36"
               ,"x-chain":"yyy"}
    try:
        response = requests.get(url, headers=headers) 
    except requests.exceptions.ConnectionError:
        print("Connection refused")
        return False    
    # print the response status code
    print(response.status_code)
    if response.status_code==200:
        try:
            return response.json()
        except json.decoder.JSONDecodeError:
            print("Response Not JSON: {}".format(response.text))
            return False
        except Exception as e:
            logging.error(e)
            return False
    else:
        logger.info("Error in response: {}".format(response.status_code)) 
        return False

def get_result_2():
    url = "google.com"
    headers = {"authority":"xxx"
               ,"path":"/xxx3"
               ,"origin":"xxxx"
               ,"referer":"xxx"
               ,"user-agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36"
               ,"x-chain":"yyy"}
    try:
        response = requests.get(url, headers=headers) 
    except requests.exceptions.ConnectionError:
        print("Connection refused")
        return False    
    # print the response status code
    print(response.status_code)
    if response.status_code==200:
        try:
            return response.json()
        except json.decoder.JSONDecodeError:
            print("Response Not JSON: {}".format(response.text))
            return False
        except Exception as e:
            logging.error(e)
            return False
    else:
        logger.info("Error in response: {}".format(response.status_code)) 
        return False


def check_status(json):
    if json==False:
        return(False, False, False)
    else:
        status = None
        for day in json:
            for slot in day[list(day.keys())[1]]:
                if slot['is_open']:
                    status = True
                    date = slot['date']
                    time = slot['time_range']
                    return(status, date, time)
        return(False, False, False)

############

def main():

    # Register handlers in Dispatcher
    start_handler = CommandHandler('start', start) # start handler
    updater.dispatcher.add_handler(start_handler)   # register start handler
    updater.dispatcher.add_handler(CommandHandler('help', help))
    updater.dispatcher.add_handler(CommandHandler('monitor_', select_))
    updater.dispatcher.add_handler(CommandHandler('stop_monitoring', stop_monitoring))
    updater.dispatcher.add_handler(CallbackQueryHandler(register_monitoring_user, pattern=r'^monitoring_ ')) 
    updater.dispatcher.add_handler(CallbackQueryHandler(unsubscribe_monitoring_user, pattern=r'^unsubscribe_ ')) 
    #
    updater.dispatcher.add_handler(MessageHandler(Filters.command, unknown))
    #error logging handler
    updater.dispatcher.add_error_handler(error)


    # Run bot
    updater.start_polling()

    # Start monitoring in thread
    global monitoring  
    monitoring = Monitoring(updater) # create parallel thread
    monitoring.daemon = True # stop the thread if the main thread quits
    monitoring.start()
    print(threading.active_count())



    # Stop the Bot when Ctrl+C received
    updater.idle()


if __name__ == '__main__':
    logger.info("Starting bot")
    main()


...