request-id实现
RequestID实现
为单次请求赋予一个唯一的RequestID可以方便定位问题,获取调用栈和日志。比较常见的场景为,接收到请求后,为请求分配RequestID,本次请求产生的所有日志、第三方调用等均关联此RequestID,该RequestID在接口的返回体或者返回头中返回给调用者。
实现思路,常见的RequestID一般是在请求进入时分配并存入上下文(context)变量中。第一次实现的时候是将RequestID记录在了request.scope对象里,每次获取RequestID时均需要获取request对象,十分的不方便。直接介绍当前使用的方案,借助 starlette-context 工具库,此库已经实现了RequestID的生成和关联了,在这里接续介绍RequestID在请求体的自动返回和日志关联(可关联至gunicorn的AccesssLog)。
requirements.txt
common/request_id.py
import logging
from starlette_context import context
from starlette_context.plugins.plugin_uuid import PluginUUIDBase
REQUEST_ID_CTX_KEY = 'X-Request-ID'
class RequestIdPlugin(PluginUUIDBase):
key = REQUEST_ID_CTX_KEY
def get_new_uuid(self) -> str:
func = self.uuid_functions_mapper[self.version]
return str(func())
def current_request_id():
"""get current request_id inside context var"""
return context.data[REQUEST_ID_CTX_KEY]
class RequestIDLogFilter(logging.Filter):
"""
Log filter to inject the current request id of the request under `log_record.request_id`
"""
def filter(self, log_record):
log_record.request_id = current_request_id()
return log_record
主要修改点:
- RequestIdPlugin 重写了 get_new_uuid 函数,调整了下生成的uuid格式。
- current_request_id会自动返回本次请求对应的request_id
- RequestIDLogFilter 用于为日志对象添加request_id值
为App新增中间件 main.py
from starlette_context.middleware import ContextMiddleware
from common.request_id import RequestIdPlugin
def get_application() -> FastAPI:
application = FastAPI(title="Demo", debug=config.DEBUG, version='1.0')
# 中间件
application.add_middleware(ContextMiddleware, plugins=(RequestIdPlugin(),))
...
application.include_router(api_router, prefix='/api')
return application
接口自动返回RequestID
common/response.py
from typing import Any, List
from pydantic import BaseModel, Field
from common.request_id import current_request_id
class Response(BaseModel):
"""接口返回格式"""
code: int = Field(..., example=0, description='响应码')
message: str = Field(..., example="success", description='响应码解释,success或者failed。')
cause: Any = Field('', example="资源未找到", description='异常解释。')
request_id: str = Field(default_factory=current_request_id, example="04950cb5-6b24-426b-a5ce-db6344982597",
description='RequestId,用于定位单次请求。')
result: Any = Field(..., example="", description='响应数据,数据格式根据API变化。')
views.py
@router.get("/demo", response_model=Response)
async def demo():
return Response(message='success', code=0, result='demo。')
单次请求的返回如下:
{'cause': '',
'code': 0,
'message': 'success',
'request_id': '99cf0bf2-42a0-4ed4-a396-da611a370d06',
'result': 'demo'}
日志关联
common/constances.py
LOG_FORMAT = '%(asctime)s [pid:%(process)d] %(filename)s(line:%(lineno)d) %(levelname)s %(message)s'
REQUEST_ID_LOG_FORMAT = '%(asctime)s [pid:%(process)d] %(filename)s(line:%(lineno)d) %(levelname)s [%(request_id)s] %(message)s'
common/log_handler.py
import json
import logging
import os
from copy import copy
from logging.handlers import WatchedFileHandler
from uvicorn.logging import AccessFormatter
import config
from common.constants import LOG_FORMAT, REQUEST_ID_LOG_FORMAT
from common.request_id import RequestIDLogFilter, current_request_id
from common.request_time import current_request_time
class LogHandler(logging.Logger):
"""
LogHandler
"""
log_format = LOG_FORMAT
def __init__(self, name, level=None, stream=True, file=True):
self.name = name
if level:
self.level = level
else:
self.level = config.LOG_LEVEL
logging.Logger.__init__(self, self.name, level=self.level)
if stream:
self.__setStreamHandler__()
if file:
self.__setFileHandler__()
def __setFileHandler__(self, level=None):
"""
set file handler
:param level:
:return:
"""
file_name = os.path.join(LOG_PATH, f'{self.name}.log')
file_handler = WatchedFileHandler(filename=file_name)
if not level:
file_handler.setLevel(self.level)
else:
file_handler.setLevel(level)
formatter = logging.Formatter(self.log_format)
file_handler.setFormatter(formatter)
self.file_handler = file_handler
self.addHandler(file_handler)
def __setStreamHandler__(self, level=None):
"""
set stream handler
:param level:
:return:
"""
stream_handler = logging.StreamHandler()
formatter = logging.Formatter(self.log_format)
stream_handler.setFormatter(formatter)
if not level:
stream_handler.setLevel(self.level)
else:
stream_handler.setLevel(level)
self.addHandler(stream_handler)
def resetName(self, name):
"""
reset name
:param name:
:return:
"""
self.name = name
self.removeHandler(self.file_handler)
self.__setFileHandler__()
class RequestLogHandler(LogHandler):
log_format = REQUEST_ID_LOG_FORMAT
def __setFileHandler__(self, level=None):
super(RequestLogHandler, self).__setFileHandler__(level=level)
self.addFilter(RequestIDLogFilter())
def __setStreamHandler__(self, level=None):
super(RequestLogHandler, self).__setStreamHandler__(level=level)
self.addFilter(RequestIDLogFilter())
views.py:
log = RequestLogHandler('demo')
@router.get("/demo", response_model=Response)
async def demo():
log.info('demo')
return Response(message='success', code=0, result='demo')
输出日志:
2020-10-01 16:27:53,065 [pid:19120] views.py(line:26) INFO [d42f6cfb-2a0f-4673-a3c9-d8c00d2e58b4] demo
关联AccessLog
FastAPI当前使用的部署方案是 nginx->gunicorn->uvicorn->fastapi,nginx做负载均衡,gunicorn负责管理调度uvicorn,uvicorn再将具体请求传递给fastapi。gunicorn部署的时候一般指定worker type为uvicorn.workers.UvicornWorker
,具体可以参考 uvicorn-deployment。为了让Access日志能够显示RequestID,需要的默认的Worker进行重写。
common/workers.py
import logging
from uvicorn.workers import UvicornWorker
from common import constants
from common.log_handler import RequestAccessFormatter
class RequestUvicornWorker(UvicornWorker):
"""自定义worker 生成指定格式的access日志"""
def __init__(self, *args, **kwargs):
super(RequestUvicornWorker, self).__init__(*args, **kwargs)
logger = logging.getLogger("uvicorn.access")
for handler in self.log.access_log.handlers:
handler.setFormatter(RequestAccessFormatter(constants.ACCESS_LOG_FORMAT))
logger.handlers = self.log.access_log.handlers
logger.setLevel(self.log.access_log.level)
common/log_handler.py
from common.request_time import current_request_time
from uvicorn.logging import AccessFormatter
class RequestAccessFormatter(AccessFormatter):
def __init__(self, fmt=None, datefmt=None, style="%", use_colors=None):
super().__init__(fmt=fmt, datefmt=datefmt, style=style)
self.use_colors = False
def get_status_code(self, record):
status_code = record.__dict__["status_code"]
return status_code
def formatMessage(self, record):
record_copy = copy(record)
record_copy.__dict__['request_id'] = current_request_id()
return super().formatMessage(record_copy)
common/comstants.py
ACCESS_LOG_FORMAT = '[%(asctime)s] %(request_id)s %(status_code)s %(client_addr)s ' \
'- "%(request_line)s"'
gunicorn使用的配置文件 gun.py
import multiprocessing
import config as local_config
bind = f'{local_config.HOST}:{local_config.PORT}'
workers = multiprocessing.cpu_count() * 2
backlog = 2048
worker_class = 'common.workers.RequestUvicornWorker'
proc_name = 'gunicorn.proc'
pidfile = 'log/gunicorn.pid'
errorlog = 'log/error.log'
accesslog = 'log/access.log'
loglevel = 'info'
daemon = True
timeout = 10
limit_request_line = 0
使用gunicorn启动服务后,access.log展示的日志格式将如下:
[2020-09-30 15:28:54,793] ff1216ac-f46e-4997-901b-c749c306e744 200 192.168.11.1:53607 - "GET /redoc HTTP/1.1"
ff1216ac-f46e-4997-901b-c749c306e744 就是本次请求对应的RequestID。
RequestTime实现
Gunicorn部署Flask的时候,是可以自动计算请求耗时的。但是使用Gunicorn+uvicorn之后,Access日志是由uvicorn负责打印的,uvicorn(uvicorn==0.11.8)当前好像还是不支持请求耗时计算。查看单次请求的耗时是个刚需,和RequestID的实现方式类型,依旧借助 starlette-context 实现,改造如下:
common/middlewares.py
from contextvars import Token
from starlette.middleware.base import RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import Response
from starlette_context import _request_scope_context_storage
from starlette_context.middleware import ContextMiddleware as BaseContextMiddleware
class ContextMiddleware(BaseContextMiddleware):
async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
_starlette_context_token: Token = _request_scope_context_storage.set(
await self.set_context(request)
)
try:
response = await call_next(request)
for plugin in self.plugins:
await plugin.enrich_response(response)
finally:
pass
return response
主要的修改点是之前请求完成后,会重置全局上下文变量,删除了这个重置动作。
common/request_time.py
import time
from typing import Union
from starlette.requests import Request
from starlette.responses import Response
from starlette_context import _request_scope_context_storage
from starlette_context import context
from starlette_context.plugins.plugin import Plugin
REQUEST_START_TIME_CTX_KEY = 'X-Request-Start-Time'
REQUEST_TIME_CTX_KEY = 'X-Request-Time'
class RequestTimePlugin(Plugin):
key = REQUEST_START_TIME_CTX_KEY
async def process_request(self, request: Request) -> Union[str, int, dict, float]:
"""
Runs always on request.
Extracts value from header by default.
"""
assert isinstance(self.key, str)
return time.time()
async def enrich_response(self, response: Response) -> None:
context: dict = _request_scope_context_storage.get()
start_time = context.pop(self.key)
context[REQUEST_TIME_CTX_KEY] = time.time() - start_time
_request_scope_context_storage.set(context)
def current_request_time(precision=3):
"""get current request_time inside context var"""
context_data = context.data
if REQUEST_TIME_CTX_KEY in context_data:
request_time = round(context.data[REQUEST_TIME_CTX_KEY], precision)
else:
request_time = round(time.time() - context.data[REQUEST_START_TIME_CTX_KEY], precision)
return request_time
common/log_handler.py
class RequestAccessFormatter(AccessFormatter):
def __init__(self, fmt=None, datefmt=None, style="%", use_colors=None):
super().__init__(fmt=fmt, datefmt=datefmt, style=style)
self.use_colors = False
def get_status_code(self, record):
status_code = record.__dict__["status_code"]
return status_code
def formatMessage(self, record):
record_copy = copy(record)
record_copy.__dict__['request_time'] = current_request_time()
record_copy.__dict__['request_id'] = current_request_id()
return super().formatMessage(record_copy)
common/constants.py
ACCESS_LOG_FORMAT = '[%(asctime)s] %(request_id)s %(status_code)s %(request_time)ss %(client_addr)s - "%(request_line)s"'
main.py:
# 中间件
application.add_middleware(ContextMiddleware, plugins=(RequestIdPlugin(), RequestTimePlugin()))
worker的修改参考RequestID部分的介绍。
最终的AccessLog格式大致为: