为什么这段代码不会关闭数据库连接呢 `SELECT * FROM information_schema.PROCESSLIST;` from sqlalchemy import create_engine, URL, delete, update, select, exists from sqlalchemy.orm import sessionmaker, scoped_session from core.database.base import Base from lib.type import Type from typing import Any from flask import g, current_app import importlib import re class Database: ENV = None def set(self, key: str, value: any): """ @ 主要场景:令牌守卫设置[读取令牌下的] """ if self.ENV == "Application": g.Application = self.container._replace(**{key: value}) if self.ENV == 'Platform': g.Platform = self.container._replace(**{key: value}) @property def container(self): if self.ENV == "Application": if "Application" not in g: g.Application = Type.Application(None, None, None) return g.Application if self.ENV == 'Platform': if "Platform" not in g: g.Platform = Type.Platform(None, None) return g.Platform @property def database_conf(self): """ @ [平台-数据库]:配置 """ return Base.setting(current_app.config["Database"]) @property def __database_core(self): return self.__create_session(**self.database_conf) @property def __create_engine(self): """ @ 数据库引擎[缓存]:生成器 """ core = self.__database_core self.set("engine", core.engine) return core.engine @property def __create_database(self): """ @ 数据库[缓存]:生成器 """ core = self.__database_core self.set("database", core.session) return core.session def __create_session(self, **config): """ @ 创建容器:节约开销 """ engine = self.create_engine(**config) session = scoped_session(sessionmaker(bind=engine, autoflush=True)) return Type.Database(engine=engine, session=session()) @classmethod def create_engine(cls, **kwargs): """ @ 创建连接:引擎 """ return create_engine(URL.create("mysql+pymysql", **kwargs), echo=True, isolation_level="AUTOCOMMIT") @staticmethod def create_all(models: list, engine=None): """ @ 创建模型:批量 """ tables = [Database.get_model(model).__table__ for model in models] Base.metadata.create_all(bind=engine, tables=tables) def create_table(self, tables: list): Database.create_all(models=tables, engine=self.__create_engine) @staticmethod def get_model(model: str): """ @ 获取模型:对象实例 """ module = importlib.import_module(f"model.{model.split('_')[0]}.{model}") class_name = ''.join(re.findall(r"[a-zA-Z]+", model.split(".")[-1].title())) return getattr(module, class_name)() @property def database(self): """ @ 数据库[缓存] """ return self.__create_database if getattr(self.container, "database") is None else self.container.database def table_data_query_all(self, model: Any, condition: list = None, order: list = None, limit: int = 500, fields: list = None) -> list[dict]: """ @ 查询:多条 """ query = select(model) if fields is not None: query = query.with_only_columns(*fields) if condition is not None: query = query.filter(*condition) if order is not None: query = query.order_by(*order) asdasdas = [row.dict() for row in self.database.execute(query.limit(limit)).scalars()] self.database.get_bind().dispose() return asdasdas def table_data_query_one(self, model: Any, condition: list = None) -> dict: """ @ 查询:单条 """ result = self.database.execute(select(model).filter(*condition).limit(1)).scalar_one_or_none() return None if result is None else result.dict() def table_data_query_exists(self, condition: list) -> bool: """ @ 查询:存在 """ return self.database.query(exists().where(*condition)).scalar() def table_data_insert_all(self, models: list) -> None: """ @ 批量新增 """ with self.database as db: db.add_all(models) db.commit() def table_data_insert_one(self, model, data: bool = False) -> int | dict: """ @ 单个新增:默认返回自增主键:可选返回整条数据 """ with self.database as db: db.add(model) db.commit() return model.dict() if data is True else model.id def table_data_update(self, model: Any, condition: list, data: dict) -> None: """ @ 批量新增 """ with self.database as db: db.execute(update(model).where(*condition).values(**data)) def table_data_delete(self, model: Any, condition: list) -> None: """ @ 批量新增 """ with self.database as db: db.execute(delete(model).where(*condition)) def close(self): """ @ 关闭数据库连接 """ if self.database is not None: self.database.close() 