Приведенный ниже код, который я изменил с Множественного наследования на Шаблон данных фасада, однако он все еще нарушает концепцию шаблона данных фасада, поскольку мои подсистемы (ES) делятся между собой. Все ES работают над структурированным потоком данных и обогащают данные вместе, все они запускаются в Asyn c и затем собираются в список (используя asyn c collect). Я хотел знать, какой шаблон данных подходит для этого
Использованного случая,
- Где я могу продолжать добавлять ES в соответствии с моим требованием.
- Каждый ES может обмениваться данными между сам по себе, как диктаторский.
- И если мне нужно добавить новую функциональность, я следую «Принципу единой ответственности»
Множественное наследование
import os
import asyncio
import psycopg2
import websockets
from datetime import datetime
from websockets.extensions import permessage_deflate
from structure import Structure
import sys
sys.path.append('..')
from event_stations.es3a import ES3A
from event_stations.es3b import ES3B
from event_stations.es3c import ES3C
from event_stations.es3d import ES3D
from event_stations.es1a import ES1A
from event_stations.es1b import ES1B
from event_stations.es2a import ES2A
from event_stations.es2b import ES2B
class FR_Server(ES1A, ES2A, ES2B, ES3A, ES3B, ES3C, ES3D, Structure):
unique_id = "100"
event_format_list = []
fr_config_table = 'detail_event.app_fer_config'
psql_db = None
def __init__(self):
print("Receiver Called INIT")
self.psql_db = self.connect_to_psql()
self._get_metadata()
super(FR_Server, self).__init__()
# self.start_receiver(self.mapped_dict)
def connect_to_psql(self):
db = psycopg2.connect("dbname=trimble user=postgres password=admin")
return db
def _get_parent_classname(self):
_parent_classes = []
_cls_obj = eval("FR_Server")
for parent in _cls_obj.__bases__:
_parent_classes.append(parent.__name__)
return _parent_classes
def _get_metadata(self):
_parents = self._get_parent_classname()
print(_parents, "pppp")
cur = self.psql_db.cursor()
cur.execute(f"Select * from {self.fr_config_table}")
results = cur.fetchall()
for result in results:
event_station_id = result[0]
if event_station_id in _parents:
event_station_classname = eval(event_station_id)
setattr(event_station_classname, "cache_table_name", result[1])
setattr(event_station_classname, "ignite_port", result[2])
setattr(event_station_classname, "ignite_host", result[3])
def get_port(self):
return os.getenv('WS_PORT', '10011')
def get_host(self):
return os.getenv('WS_HOST', 'localhost')
async def start(self):
return await websockets.serve(self.handler, self.get_host(), self.get_port(), ping_interval=None, max_size=None,
max_queue=None, close_timeout=None, extensions=[
permessage_deflate.ServerPerMessageDeflateFactory(
server_max_window_bits=11,
client_max_window_bits=11,
compress_settings={'memLevel': 4},
),
])
def generate_event_id(self, index):
return "".join(['%02d%02d%d%d%d%d%d' % (datetime.now().day, datetime.now().month, datetime.now().year,
datetime.now().hour,datetime.now().minute, datetime.now().second,
datetime.now().microsecond), self.unique_id,index])
async def handler(self, websocket, path):
async with websockets.connect('ws://localhost:10015', ping_interval=None, max_size=None,
max_queue=None, close_timeout=None,
extensions=[permessage_deflate.ClientPerMessageDeflateFactory(
server_max_window_bits=11,
client_max_window_bits=11,
compress_settings={'memLevel': 4},
),
]) as websocket_rb:
async for row in websocket:
lst_row = row.decode().split(",")
uid = self.generate_event_id(lst_row[0])
lst_row = [uid] + lst_row
results = await asyncio.gather(self.enrich_column_es3a_dict(lst_row[1]),
self.enrich_column_es3b_dict(lst_row[1]),
self.enrich_column_es3c_dict(lst_row[1]),
self.enrich_column_es3d_dict(lst_row[1]))
await websocket_rb.send(str(lst_row + results).encode())
def start_receiver(self, mapped_list):
self.event_format_list = mapped_list
asyncio.get_event_loop().run_until_complete(self.start())
asyncio.get_event_loop().run_forever()
Шаблон данных фасада:
from __future__ import annotations
from event_stations.es1a import ES1A
from event_stations.es2a import ES2A
from event_stations.es2b import ES2B
import psycopg2
class Foundation_Facade(object):
psql_db = None
client = None
def __init__(self, es1a: ES1A, es2a: ES2A) -> None:
self._es1a = es1a or ES1A()
self._es2a = es2a or ES2A()
def operation(self):
print("Called")
results = []
self.psql_db = self._es1a._connect_psql()
self._es1a._get_metadata(self.psql_db.cursor())
self.client = self._es1a.connect_ignite_client(self._es1a.ignite_host, self._es1a.ignite_port)
self._es2a._get_metadata(self.psql_db.cursor())
self._es2a.put_data(self.client)
self._es2b._get_metadata(self.psql_db.cursor())
self._es2b.put_data(self.client)
print(self._es2b.static_df.head())
# results.append(self._es1a._get_metadata())
return results
if __name__ == '__main__':
es1a = ES1A()
es2a = ES2A()
es2b = ES2B()
facade = Foundation_Facade(es1a, es2a)
from fr_server_1 import Server
Server(facade)