diff --git a/backend/common/log.py b/backend/common/log.py index 69ebeb0e..d3013f09 100644 --- a/backend/common/log.py +++ b/backend/common/log.py @@ -94,7 +94,7 @@ def set_custom_logfile() -> None: log_config = { 'format': settings.LOG_FILE_FORMAT, 'enqueue': True, - 'rotation': '5 MB', + 'rotation': '00:00', 'retention': '7 days', 'compression': 'tar.gz', } diff --git a/backend/core/conf.py b/backend/core/conf.py index 1da79057..64b84295 100644 --- a/backend/core/conf.py +++ b/backend/core/conf.py @@ -53,8 +53,8 @@ class Settings(BaseSettings): FASTAPI_STATIC_FILES: bool = True # 数据库 - DATABASE_ECHO: bool = False - DATABASE_POOL_ECHO: bool = False + DATABASE_ECHO: bool | Literal['debug'] = False + DATABASE_POOL_ECHO: bool | Literal['debug'] = False DATABASE_SCHEMA: str = 'fba' DATABASE_CHARSET: str = 'utf8mb4' @@ -115,7 +115,6 @@ class Settings(BaseSettings): # 中间件配置 MIDDLEWARE_CORS: bool = True - MIDDLEWARE_ACCESS: bool = True # 请求限制配置 REQUEST_LIMITER_REDIS_PREFIX: str = 'fba:limiter' @@ -151,18 +150,14 @@ class Settings(BaseSettings): LOG_CID_DEFAULT_VALUE: str = '-' LOG_CID_UUID_LENGTH: int = 32 # 日志 correlation_id 长度,必须小于等于 32 LOG_STD_LEVEL: str = 'INFO' - LOG_ACCESS_FILE_LEVEL: str = 'INFO' - LOG_ERROR_FILE_LEVEL: str = 'ERROR' LOG_STD_FORMAT: str = ( - '{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | ' - ' {correlation_id} | {message}' - ) - LOG_FILE_FORMAT: str = ( - '{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | ' - ' {correlation_id} | {message}' + '{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {correlation_id} | {message}' ) + LOG_ACCESS_FILE_LEVEL: str = 'INFO' + LOG_ERROR_FILE_LEVEL: str = 'ERROR' LOG_ACCESS_FILENAME: str = 'fba_access.log' LOG_ERROR_FILENAME: str = 'fba_error.log' + LOG_FILE_FORMAT: str = '{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {correlation_id} | {message}' # 操作日志 OPERA_LOG_PATH_EXCLUDE: list[str] = [ diff --git a/backend/core/registrar.py b/backend/core/registrar.py index 90ea6017..439a2420 100644 --- a/backend/core/registrar.py +++ b/backend/core/registrar.py @@ -20,6 +20,7 @@ from backend.core.path_conf import STATIC_DIR, UPLOAD_DIR from backend.database.db import create_table from backend.database.redis import redis_client +from backend.middleware.access_middleware import AccessMiddleware from backend.middleware.jwt_auth_middleware import JwtAuthMiddleware from backend.middleware.opera_log_middleware import OperaLogMiddleware from backend.middleware.state_middleware import StateMiddleware @@ -69,8 +70,8 @@ def register_app() -> FastAPI: ) # 注册组件 - register_socket_app(app) register_logger() + register_socket_app(app) register_static_file(app) register_middleware(app) register_router(app) @@ -110,29 +111,20 @@ def register_middleware(app: FastAPI) -> None: :param app: FastAPI 应用实例 :return: """ - # Opera log (必须) + # Opera log app.add_middleware(OperaLogMiddleware) - # JWT auth (必须) + # State + app.add_middleware(StateMiddleware) + + # JWT auth app.add_middleware( AuthenticationMiddleware, backend=JwtAuthMiddleware(), on_error=JwtAuthMiddleware.auth_exception_handler, ) - # Access log - if settings.MIDDLEWARE_ACCESS: - from backend.middleware.access_middleware import AccessMiddleware - - app.add_middleware(AccessMiddleware) - - # State - app.add_middleware(StateMiddleware) - - # Trace ID (必须) - app.add_middleware(CorrelationIdMiddleware, validator=False) - - # CORS(必须放在最下面) + # CORS if settings.MIDDLEWARE_CORS: from fastapi.middleware.cors import CORSMiddleware @@ -145,6 +137,12 @@ def register_middleware(app: FastAPI) -> None: expose_headers=settings.CORS_EXPOSE_HEADERS, ) + # Access log + app.add_middleware(AccessMiddleware) + + # Trace ID + app.add_middleware(CorrelationIdMiddleware, validator=False) + def register_router(app: FastAPI) -> None: """ diff --git a/backend/main.py b/backend/main.py index 7de536aa..c6ff227d 100644 --- a/backend/main.py +++ b/backend/main.py @@ -8,14 +8,14 @@ from backend.plugin.tools import get_plugins, install_requirements from backend.utils.timezone import timezone -_print_log_style = f'{timezone.to_str(timezone.now(), "%Y-%m-%d %H:%M:%S.%M0")} | fba | - | ' -console.print(Text(f'{_print_log_style}检测插件依赖...', style='bold cyan')) +_log_prefix = f'{timezone.to_str(timezone.now(), "%Y-%m-%d %H:%M:%S.%M0")} | {"INFO": <8} | - | ' +console.print(Text(f'{_log_prefix}检测插件依赖...', style='bold cyan')) _plugins = get_plugins() with Progress( - SpinnerColumn(finished_text=f'[bold green]{_print_log_style}插件准备就绪[/]'), - TextColumn('[bold green]{task.completed}/{task.total}[/]'), + SpinnerColumn(finished_text=f'[bold green]{_log_prefix}插件准备就绪[/]'), + TextColumn('{task.completed}/{task.total}', style='bold green'), TimeElapsedColumn(), console=console, ) as progress: @@ -24,6 +24,6 @@ install_requirements(plugin) progress.advance(task) -console.print(Text(f'{_print_log_style}启动服务...', style='bold magenta')) +console.print(Text(f'{_log_prefix}启动服务...', style='bold magenta')) app = register_app() diff --git a/backend/middleware/access_middleware.py b/backend/middleware/access_middleware.py index 61fa38a4..71f7d697 100644 --- a/backend/middleware/access_middleware.py +++ b/backend/middleware/access_middleware.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +import time + from fastapi import Request, Response from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint @@ -8,7 +10,7 @@ class AccessMiddleware(BaseHTTPMiddleware): - """请求日志中间件""" + """访问日志中间件""" async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response: """ @@ -18,11 +20,25 @@ async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) - :param call_next: 下一个中间件或路由处理函数 :return: """ + path = request.url.path if not request.url.query else request.url.path + '/' + request.url.query + + if request.method != 'OPTIONS': + log.info(f'--> 请求开始[{path}]') + log.info(f'请求方式:[{request.method}]') + + perf_time = time.perf_counter() + request.state.perf_time = perf_time + start_time = timezone.now() + request.state.start_time = start_time + response = await call_next(request) - end_time = timezone.now() - log.info( - f'{request.client.host: <15} | {request.method: <8} | {response.status_code: <6} | ' - f'{request.url.path} | {round((end_time - start_time).total_seconds(), 3) * 1000.0}ms' - ) + + elapsed = (time.perf_counter() - perf_time) * 1000 + + if request.method != 'OPTIONS': + log.info(f'接口耗时:[{elapsed:.3f}]ms') + log.info(f'接口状态:[{response.status_code}]') + log.info('<-- 请求结束') + return response diff --git a/backend/middleware/opera_log_middleware.py b/backend/middleware/opera_log_middleware.py index 3ac0477e..43eea950 100644 --- a/backend/middleware/opera_log_middleware.py +++ b/backend/middleware/opera_log_middleware.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +import time + from asyncio import create_task from typing import Any @@ -11,12 +13,10 @@ from backend.app.admin.schema.opera_log import CreateOperaLogParam from backend.app.admin.service.opera_log_service import opera_log_service -from backend.common.dataclasses import RequestCallNext from backend.common.enums import OperaLogCipherType, StatusType from backend.common.log import log from backend.core.conf import settings from backend.utils.encrypt import AESCipher, ItsDCipher, Md5Cipher -from backend.utils.timezone import timezone from backend.utils.trace_id import get_request_trace_id @@ -31,113 +31,89 @@ async def dispatch(self, request: Request, call_next: Any) -> Response: :param call_next: 下一个中间件或路由处理函数 :return: """ - # 排除记录白名单 + response = None path = request.url.path - if path in settings.OPERA_LOG_PATH_EXCLUDE or not path.startswith(f'{settings.FASTAPI_API_V1_PATH}'): - return await call_next(request) - - # 请求解析 - try: - # 此信息依赖于 jwt 中间件 - username = request.user.username - except AttributeError: - username = None - method = request.method - args = await self.get_request_args(request) - args = await self.desensitization(args) - - # 执行请求 - start_time = timezone.now() - request_next = await self.execute_request(request, call_next) - end_time = timezone.now() - cost_time = round((end_time - start_time).total_seconds() * 1000.0, 3) - - # 此信息只能在请求后获取 - _route = request.scope.get('route') - summary = getattr(_route, 'summary', None) or '' - - # 日志创建 - opera_log_in = CreateOperaLogParam( - trace_id=get_request_trace_id(request), - username=username, - method=method, - title=summary, - path=path, - ip=request.state.ip, - country=request.state.country, - region=request.state.region, - city=request.state.city, - user_agent=request.state.user_agent, - os=request.state.os, - browser=request.state.browser, - device=request.state.device, - args=args, - status=request_next.status, - code=request_next.code, - msg=request_next.msg, - cost_time=cost_time, - opera_time=start_time, - ) - create_task(opera_log_service.create(obj=opera_log_in)) # noqa: ignore - - # 错误抛出 - if request_next.err: - raise request_next.err from None - - return request_next.response - - async def execute_request(self, request: Request, call_next: Any) -> RequestCallNext: - """ - 执行请求并处理异常 - :param request: FastAPI 请求对象 - :param call_next: 下一个中间件或路由处理函数 - :return: - """ - code = 200 - msg = 'Success' - status = StatusType.enable - err = None - response = None - try: + if path in settings.OPERA_LOG_PATH_EXCLUDE or not path.startswith(f'{settings.FASTAPI_API_V1_PATH}'): response = await call_next(request) - code, msg = self.request_exception_handler(request, code, msg) - except Exception as e: - log.error(f'请求异常: {str(e)}') - # code 处理包含 SQLAlchemy 和 Pydantic - code = getattr(e, 'code', code) - msg = getattr(e, 'msg', msg) - status = StatusType.disable - err = e - - return RequestCallNext(code=str(code), msg=msg, status=status, err=err, response=response) - - @staticmethod - def request_exception_handler(request: Request, code: int, msg: str) -> tuple[str, str]: - """ - 请求异常处理器 - - :param request: FastAPI 请求对象 - :param code: 错误码 - :param msg: 错误信息 - :return: - """ - exception_states = [ - '__request_http_exception__', - '__request_validation_exception__', - '__request_assertion_error__', - '__request_custom_exception__', - '__request_all_unknown_exception__', - '__request_cors_500_exception__', - ] - for state in exception_states: - exception = getattr(request.state, state, None) - if exception: - code = exception.get('code') - msg = exception.get('msg') - log.error(f'请求异常: {msg}') - break - return code, msg + else: + method = request.method + args = await self.get_request_args(request) + args = await self.desensitization(args) + + # 执行请求 + elapsed = 0.0 + code = 200 + msg = 'Success' + status = StatusType.enable + error = None + try: + response = await call_next(request) + elapsed = (time.perf_counter() - request.state.perf_time) * 1000 + for state in [ + '__request_http_exception__', + '__request_validation_exception__', + '__request_assertion_error__', + '__request_custom_exception__', + '__request_all_unknown_exception__', + '__request_cors_500_exception__', + ]: + exception = getattr(request.state, state, None) + if exception: + code = exception.get('code') + msg = exception.get('msg') + log.error(f'请求异常: {msg}') + break + except Exception as e: + log.error(f'请求异常: {str(e)}') + code = getattr(e, 'code', code) # 兼容 SQLAlchemy 异常用法 + msg = getattr(e, 'msg', msg) + status = StatusType.disable + error = e + + # 此信息只能在请求后获取 + _route = request.scope.get('route') + summary = getattr(_route, 'summary', '') + + try: + # 此信息来源于 JWT 认证中间件 + username = request.user.username + except AttributeError: + username = None + + # 日志记录 + log.debug(f'请求地址:[{request.state.ip}]') + log.debug(f'请求参数:{args}') + + # 日志创建 + opera_log_in = CreateOperaLogParam( + trace_id=get_request_trace_id(request), + username=username, + method=method, + title=summary, + path=path, + ip=request.state.ip, + country=request.state.country, + region=request.state.region, + city=request.state.city, + user_agent=request.state.user_agent, + os=request.state.os, + browser=request.state.browser, + device=request.state.device, + args=args, + status=status, + code=str(code), + msg=msg, + cost_time=elapsed, # 可能和日志存在微小差异(可忽略) + opera_time=request.state.start_time, + ) + create_task(opera_log_service.create(obj=opera_log_in)) # noqa: ignore + + # 错误抛出 + if error: + raise error from None + + return response @staticmethod async def get_request_args(request: Request) -> dict[str, Any]: @@ -147,25 +123,39 @@ async def get_request_args(request: Request) -> dict[str, Any]: :param request: FastAPI 请求对象 :return: """ - args = dict(request.query_params) - args.update(request.path_params) + args = {} + query_params = dict(request.query_params) + if query_params: + args['query_params'] = query_params + path_params = request.path_params + if path_params: + args['path_params'] = path_params # Tip: .body() 必须在 .form() 之前获取 # https://github.com/encode/starlette/discussions/1933 + content_type = request.headers.get('Content-Type', '').split(';') body_data = await request.body() - form_data = await request.form() - if len(form_data) > 0: - args.update({k: v.filename if isinstance(v, UploadFile) else v for k, v in form_data.items()}) - elif body_data: - content_type = request.headers.get('Content-Type', '').split(';') - if 'application/json' in content_type: + if body_data: + # 注意:非 json 数据默认使用 body 作为键 + if 'application/json' not in content_type: + args['data'] = str(body_data) + else: json_data = await request.json() if isinstance(json_data, dict): - args.update(json_data) + args['json'] = json_data + else: + args['data'] = str(body_data) + form_data = await request.form() + if len(form_data) > 0: + for k, v in form_data.items(): + if isinstance(v, UploadFile): + form_data = {k: v.filename} else: - # 注意:非字典数据默认使用 body 作为键 - args.update({'body': str(body_data)}) + form_data = {k: v} + if 'multipart/form-data' not in content_type: + args['x-www-form-urlencoded'] = form_data else: - args.update({'body': str(body_data)}) + args['form-data'] = form_data + return args @staticmethod diff --git a/backend/middleware/state_middleware.py b/backend/middleware/state_middleware.py index b7307524..ee9069ea 100644 --- a/backend/middleware/state_middleware.py +++ b/backend/middleware/state_middleware.py @@ -7,7 +7,7 @@ class StateMiddleware(BaseHTTPMiddleware): - """请求 state 中间件,用于解析和设置请求的附加信息""" + """请求状态中间件""" async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response: """ @@ -18,13 +18,12 @@ async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) - :return: """ ip_info = await parse_ip_info(request) - ua_info = parse_user_agent_info(request) - - # 设置附加请求信息 request.state.ip = ip_info.ip request.state.country = ip_info.country request.state.region = ip_info.region request.state.city = ip_info.city + + ua_info = parse_user_agent_info(request) request.state.user_agent = ua_info.user_agent request.state.os = ua_info.os request.state.browser = ua_info.browser