-
Notifications
You must be signed in to change notification settings - Fork 448
Description
Issue Description
Type: feature request
Describe what feature you want
背景
Sentinel 作为面向分布式应用场景、多语言异构化服务架构的流量治理组件,以丰富的流量防护能力满足了各种应用场景的限流需求。当下 AI 应用成为广大开发者关注的领域,但是想要将 AI 应用真正用于生产,高可用的能力是必不可少的,由此也出现了很多 AI 应用场景下新的流量防护需求,例如 Token 限流,Token 这个 AI 场景下的常用单位,在作为限流的统计维度时存在着限流时机与统计时机不一致,强需求集群限流等特点。
Token限流现状
AI网关Token限流能力
现在已经有多个AI网关(如Kong、Higress等)以插件的方式实现了LLM Token限流能力,限流算法大多使用固定窗口或滑动窗口,限流过程与传统限流的区别在于:限流信息的更新依赖于下游服务的调用结果。
竞品分析
项目名称 | Kong | Higress | APISIX | Envoy AI Gateway |
---|---|---|---|---|
简介 | 企业级 API 网关,提供商业化 AI 限流能力 | 开源云原生网关,源自阿里云实践 | 轻量级开源 API 网关 | 基于 Envoy 的可扩展 AI 网关 |
Token 维度 | 总/输入/输出/自定义成本 Token | 仅总 Token | 输入/输出/总 Token | CEL 表达式 |
存储策略 | 内存/内置 DB/Redis | Redis 强依赖 | 本地 LRU 缓存 | 内存/Redis |
请求维度 | IP/凭证/消费者/服务/请求头等 | IP/参数/请求头/Consumer | 仅模型名称 | 请求头/URL 参数/IP |
限流算法 | 固定+滑动窗口(秒级) | 固定窗口(秒至天) | 固定窗口(秒级) | Envoy 原生限流 |
响应头部 | 丰富 X-AI-RateLimit-* 信息 | 基础限流头部 | 基础限流头部 | 无 |
核心优势 | 多维度控制;成本限流;数据一致性 | 长周期限流;开源免费 | 轻量部署;实时反馈 | CEL 灵活定制;存储可选 |
主要不足 | 闭源付费;无降级策略 | 无输入输出区分;强依赖 Redis;无降级策略 | 无集群同步;扩展性差;无降级策略 | 配置复杂;无响应头部;无降级策略 |
开源情况 | 企业版闭源 | 开源 | 开源 | 开源 |
计划功能点对比
考虑到LLM应用的强集群需求,所以本issue不实现单机模式。
总结
尽管当前AI网关实现的Token限流能力已经非常多样化,但是仍存在功能点分散、流量临界、无法应对突发流量等问题,无法满足LLM开发框架(如LangChainGo、Eino等)实际需要的Token限流需求。
因此,本issue希望在 Sentinel 中实现基础的 Token 限流能力,并在此基础上,提供多样化的限流策略,限制 AI 应用的 Token 消耗,解决流量临界问题,应对突发流量,保障 AI 应用的整体稳定性。
预期提供的API
本issue预期提供Token限流相关的功能点如下:
- 核心功能:集群模式下的基础Token限流能力
- 实现1~2个LLM开发框架的适配器
- 扩展功能:预测误差时序分摊(Predictive Error Temporal Amortized,PETA)、LLM用量及限流响应信息
详细设计
总体流程图
LLM应用实例主要是指以LangChainGo为代表的LLM应用框架,计划Sentinel以包装函数的方式适配到LLM开发框架中,对模型调用Token进行限流。
基础Token限流
总体时序图
功能点列表
基础Token限流方案将包含下列功能点:
- 支持集群模式
- 支持多请求维度(优先支持header维度)
- 支持多时间维度
- 适配1~2个LLM开发框架
- 限流算法:固定窗口限流
初始化
在Token限流中,Sentinel需要读取配置文件完成以下初始化工作:
- 连接Redis
- 初始化Token限流规则(支持配置文件初始化、提供LoadRules API动态加载)
Entry设计
Sentinel使用Entry函数将业务逻辑封装起来,这一过程称为“埋点”,每个埋点都关联一个资源名称,它标识了触发该资源的调用或访问。函数原型如下:
func Entry(resource string, opts ...Option) (*base.SentinelEntry, *base.BlockError)
在微服务场景下,调用Entry函数时,传入的resource一般是作为接口路径或名称。
一般来说,由于Token限流总是作为网关接收外部请求,Token限流方案的流量类型TrafficType
固定为Inbound
,其中Inbound
代表入口流量,Outbound
代表出口流量。实际上,集群模式下的Token限流用不到TrafficType
字段。
下面是Entry函数调用示例:
// Wrapper request
reqInfos := llmtokenratelimit.GenerateRequestInfos(
llmtokenratelimit.WithHeader(header),
llmtokenratelimit.WithPrompts(prompts),
)
// Check
e, b := sentinel.Entry(resource, sentinel.WithTrafficType(base.Inbound), sentinel.WithArgs(reqInfos))
if b != nil {
// Blocked
println("Blocked")
continue
}
// Pass
println("Pass")
// Simulate llm service
time.Sleep(llmRunningTime)
// Update used token info
entry.SetPair(llmtokenratelimit.KeyUsedTokenInfos, usedTokenInfos)
// Must be executed immediately after the SetPair function
e.Exit()
请求信息包装
为了规范化传入Entry的请求参数信息,计划向用户提供一个参数结构体,用户需将本次请求的相关信息填写到结构体里面,再通过WithArgs方法传入到Entry中。
type RequestInfos struct {
Headers map[string][]string `json:"headers"`
Prompts []string `json:"prompts"`
}
LLM Token消耗信息包装
为了规范化LLM Token消耗信息,计划向用户提供一个参数结构体,用户需将本次LLM请求Token消耗的相关信息填写到结构体里面,再通过提供的API方法更新Token信息。
type UsedTokenInfos struct {
InputTokens int64 `json:"inputTokens"`
OutputTokens int64 `json:"outputTokens"`
TotalTokens int64 `json:"totalTokens"`
}
除此之外,为方便使用,计划以模型厂商维度提供一个包装函数,便于用户使用,如下为包装OpenAI token消耗的辅助函数:
func OpenAITokenExtractor(response interface{}) *UsedTokenInfos {
if response == nil {
return nil
}
resp, ok := response.(map[string]any)
if !ok {
return nil
}
inputTokens, ok := resp["prompt_tokens"].(int)
if !ok {
return nil
}
outputTokens, ok := resp["completion_tokens"].(int)
if !ok {
return nil
}
totalTokens, ok := resp["total_tokens"].(int)
if !ok {
return nil
}
return GenerateUsedTokenInfos(
WithInputTokens(inputTokens),
WithOutputTokens(outputTokens),
WithTotalTokens(totalTokens),
)
}
限流算法
下面描述如何使用固定窗口算法实现Token限流的完整过程。
对于用户的每个LLM调用请求,都需要先经过Sentinel的Entry函数,该函数使用责任链模式依次执行提前注册好的限流、熔断等组件初始化、规则检查、调用统计功能(称其为StatPrepareSlot、RuleCheckSlot、StatSlot)。所以,Token限流也需要以同样的方式实现这三类方法。
- StatPrepareSlot:Token限流不需要在该方法中完成任何事情。
- RuleCheckSlot:限流规则检查采用分层匹配设计。首先该方法会读取Entry传入的resource,然后将resource与所有已配置的限流规则(配置文件中对应的是resource字段)进行正则匹配。针对每条匹配命中的规则,将会提取每条具体规则项(配置文件中对应的是specificItems字段)与当前请求的相关信息进行具体规则项正则匹配,若匹配未命中,则跳过;否则,将规则项信息通过固定的格式组成Redis Key,然后利用Lua脚本原子读取Redis中剩余的Token数量,若剩余Token数量大于等于0,则放行请求,返回True;否则拒绝请求,返回False。Lua脚本实现如下:
-- KEYS[1]: Fixed Window Key ("<redisRatelimitKey>")
-- ARGV[1]: Maximum Token capacity
-- ARGV[2]: Window size (milliseconds)
local fixed_window_key = KEYS[1]
local max_token_capacity = tonumber(ARGV[1])
local window_size = tonumber(ARGV[2])
local ttl = redis.call('PTTL', fixed_window_key)
if ttl < 0 then
redis.call('SET', fixed_window_key, max_token_capacity, 'PX', window_size)
return {max_token_capacity, window_size}
end
return {redis.call('GET', fixed_window_key), ttl}
- StatSlot:放行请求后,读取实际消耗的Token数,通过SentinelEntry.SetPair方法记录Token消耗,然后在OnCompleted方法中依次遍历命中的具体规则项,根据当前具体规则项的Token计算策略,计算本次消耗的Token数,最后利用Lua脚本和decrby命令原子更新Redis中剩余的Token数量(细节:SetPair方法必须在SentinelEntry.Exit方法前执行,且Exit必须被立即执行,否则会更新失败)。Lua脚本实现如下:
-- KEYS[1]: Fixed Window Key ("<redisRatelimitKey>")
-- ARGV[1]: Maximum Token capacity
-- ARGV[2]: Window size (milliseconds)
-- ARGV[3]: Actual token consumption
local fixed_window_key = KEYS[1]
local max_token_capacity = tonumber(ARGV[1])
local window_size = tonumber(ARGV[2])
local actual = tonumber(ARGV[3])
local ttl = redis.call('PTTL', fixed_window_key)
if ttl < 0 then
redis.call('SET', fixed_window_key, max_token_capacity-actual, 'PX', window_size)
return {max_token_capacity-actual, window_size}
end
return {redis.call('DECRBY', fixed_window_key, actual), ttl}
限流规则检查补充例子说明
现在有规则配置如下:
- 接口前缀为
/a/
,检查请求中所有header键的前缀是X-CA-
、值的前缀是123
的键值对在60秒内的总token不超过900,下简称该规则配置为rules-A
,对应配置文件如下:
- resource: "/a/*"
specificItems:
- identifier:
type: header
value: "X-CA-*"
keyItems:
- key: "123*"
token:
number: 900
countStrategy: "total-tokens"
time:
unit: second
value: 60
现在,假设某个请求的接口路径或名称为/a/b
,该请求包含了1个header键值对{X-CA-A:123}
,将/a/b
作为resource传入Entry,首先会正则匹配到rules-A.resource
的/a/*
,再往下X-CA-A
二次正则匹配到了rules-A.specificItems[0].identifier.value
的X-CA-*
,再往下123
三次正则匹配到了rules-A.specificItems[0].keyItems[0].key
的123*
。
到这里为止,我们就认为该请求命中了该具体规则项,对应Redis
的Key=sentinel-go:llm-token-ratelimit:resource-<hashedResource>:fixed-window:header:60:total-tokens
,初始Value=900
,由于Value>=0
,请求放行。
接着,假设该请求消耗了500 input-tokens
、500 output-tokens
,那么total-tokens=1000
,更新Value=-100
(允许Value
为负)。
再然后,假设又来一个请求的接口名称为/a/c
,该请求同样包含了1个header键值对{X-CA-B:123456}
,依然能够匹配到rules-A
,对应Redis
的Key=sentinel-go:llm-token-ratelimit:resource-<hashedResource>:fixed-window:header:60:total-tokens
,但此时Value=-100
,由于Value<0
,所以拒绝该请求。
综上,我们通过将Rule中的所有统计指标都作为Redis Key
的一部分,能够区分同类请求并进行限流。
- 注:
hashedResource = murmur3(resource)
LLM框架适配
获取消耗的Token数
获取Completion方法消耗的Token数,可读取llms.ContentChoice中的GenerationInfo字段,得到消耗的输入、输出、推理(可选)、总Token数。该字段是llm.GenerateContent方法的返回值。
经调研,LangChainGo的embeddings.CreateEmbedding接口返回结果不包含Token消耗结果,以langchaingo/llms/openai/internal/openaiclient/openaiclient.go中实现的CreateEmbedding接口为例,该方法仅返回 [][]float32 作为词向量结果,且其中调用的createEmbedding属于私有方法,不好包装。综上所述,本issue不考虑支持Embedding方法。
框架适配
为了LLM框架更方便使用Sentinel提供Token限流能力,计划对部分知名LLM框架提供适配器。
由于未发现LangChainGo存在函数注入的Middleware接口,且LangChainGo提供的回调方法并不专用于GenerateContent方法。综合考虑,采取包装函数方式,传入大模型实例,包装llms.Model接口的GenerateContent、Call(Deprecated)这类文本补全、生成方法
下面以GenerateContent为例,说明如何包装和调用。
func (t *LLMWrapper) GenerateContent(
ctx context.Context,
messages []langchaingo.MessageContent,
options ...langchaingo.CallOption,
) (*langchaingo.ContentResponse, error) {
// 1.初始化
// 2. Sentinel限流检查
// 3. 正常调用模型
response, err := t.llm.GenerateContent(ctx, messages, options...)
if err != nil {
return nil, err
}
// 4. 根据调用结果更新Token数量
// 5. 返回模型结果
return response, nil
}
扩展功能项
预测误差时序分摊(Predictive Error Temporal Amortized,PETA)
总体时序图
算法描述
为了解决基础Token限流中面临的Token统计滞后性导致的非法请求错放问题,计划提出PETA,改善错放情况。
- 所谓时序分摊是指将低估的token分摊到未来的窗口中,进而影响后续请求
- 该算法结合了滑动窗口和类似于令牌桶(没有生成速率,而是回收过期窗口)的方法
基本过程:Token预扣使用外部库(tiktoken-go)计算出可能消耗的Token数量,然后提前更新Redis限流信息,等到实际调用后,再修正结果。
- 可以影响到 input-tokens、total-tokens
- 由于total-tokens依赖output-tokens,随机性大,预测效果差,不建议对total-tokens使用该策略
- output-tokens暂时无法预测,所以影响不到,预估token默认是0
- 注意:即使命中input-tokens规则,也并不会同时命中total-tokens规则
token计算策略:使用tiktoken-go计算初始输入内容预计消耗token数量;为了能够适应token消耗变化,需要每次Set更新预测值和真实值的差值
- 使用redis存储差值
- key格式:"<redisRatelimitKey>:token-encoder:<model-provider>:<model-name>"
- redisRatelimitKey格式:"sentinel-go:llm-token-ratelimit:<ruleName>:<strategy>:<identifierType>:<timeWindow>:<tokenCountStrategy>"
- 过期时间与限流策略一致
- key格式:"<redisRatelimitKey>:token-encoder:<model-provider>:<model-name>"
- 总结token预测计算策略如下
- estimatedToken = tiktoken(raw_contents)+query_difference(redis_key)
- 若estimatedToken 结果为负数,则重置difference为0,estimatedToken = tiktoken(raw_contents)
Token预扣:
-- KEYS[1]: Sliding Window Key ("{shard-<hashtag>}:sliding-window:<redisRatelimitKey>")
-- KEYS[2]: Token Bucket Key ("{shard-<hashtag>}:token-bucket:<redisRatelimitKey>")
-- KEYS[3]: Token Encoder Key ("{shard-<hashtag>}:token-encoder:<provider>:<model>:<redisRatelimitKey>")
-- ARGV[1]: Estimated token consumption
-- ARGV[2]: Current timestamp (milliseconds)
-- ARGV[3]: Token bucket capacity
-- ARGV[4]: Window size (milliseconds)
-- ARGV[5]: Random string for sliding window unique value (length less than or equal to 255)
local function calculate_tokens_in_range(key, start_time, end_time)
local valid_list = redis.call('ZRANGEBYSCORE', key, start_time, end_time)
local valid_tokens = 0
for _, v in ipairs(valid_list) do
local _, tokens = struct.unpack('Bc0L', v)
valid_tokens = valid_tokens + tokens
end
return valid_tokens
end
local sliding_window_key = tostring(KEYS[1])
local token_bucket_key = tostring(KEYS[2])
local token_encoder_key = tostring(KEYS[3])
local estimated = tonumber(ARGV[1])
local current_timestamp = tonumber(ARGV[2])
local bucket_capacity = tonumber(ARGV[3])
local window_size = tonumber(ARGV[4])
local random_string = tostring(ARGV[5])
-- Valid window start time
local window_start = current_timestamp - window_size
-- Waiting time
local waiting_time = 0
-- Get bucket
local bucket = redis.call('HMGET', token_bucket_key, 'capacity', 'max_capacity')
local current_capacity = tonumber(bucket[1])
local max_capacity = tonumber(bucket[2])
-- Initialize bucket manually if it doesn't exist
if not current_capacity then
current_capacity = bucket_capacity
max_capacity = bucket_capacity
redis.call('HMSET', token_bucket_key, 'capacity', bucket_capacity, 'max_capacity', bucket_capacity)
redis.call('ZADD', sliding_window_key, current_timestamp,
struct.pack('Bc0L', string.len(random_string), random_string, 0))
end
-- Calculate expired tokens
local released_tokens = calculate_tokens_in_range(sliding_window_key, 0, window_start)
if released_tokens > 0 then -- Expired tokens exist, attempt to replenish new tokens
-- Clean up expired data
redis.call('ZREMRANGEBYSCORE', sliding_window_key, 0, window_start)
-- Calculate valid tokens
local valid_tokens = calculate_tokens_in_range(sliding_window_key, '-inf', '+inf')
-- Update token count
if current_capacity + released_tokens > max_capacity then -- If current capacity plus released tokens exceeds max capacity, reset to max capacity minus valid tokens
current_capacity = max_capacity - valid_tokens
else -- Otherwise, directly add the released tokens
current_capacity = current_capacity + released_tokens
end
-- Immediately replenish new tokens
redis.call('HSET', token_bucket_key, 'capacity', current_capacity)
end
-- Plus the difference from the token encoder if it exists
local ttl = redis.call('PTTL', token_encoder_key)
local difference = tonumber(redis.call('GET', token_encoder_key))
if ttl < 0 then
difference = 0
else
if difference + estimated >= 0 then
estimated = estimated + difference
else
redis.call('SET', token_encoder_key, 0)
end
end
-- Check if the request can be satisfied
if max_capacity < estimated or estimated <= 0 then -- If max capacity is less than estimated consumption or estimated is less than or equal to 0, return -1 indicating rejection
waiting_time = -1
elseif current_capacity < estimated then -- If current capacity is insufficient to satisfy estimated consumption, calculate waiting time
-- Get the earliest valid timestamp
local first_valid_window = redis.call('ZRANGE', sliding_window_key, 0, 0, 'WITHSCORES')
local first_valid_start = tonumber(first_valid_window[2])
if not first_valid_start then
first_valid_start = current_timestamp
end
-- Waiting time = fixed delay + window size - valid window interval
waiting_time = 3 + window_size - (current_timestamp - first_valid_start)
else -- Otherwise, capacity satisfies estimated consumption, no waiting required, update data
redis.call('ZADD', sliding_window_key, current_timestamp,
struct.pack('Bc0L', string.len(random_string), random_string, estimated))
current_capacity = current_capacity - estimated
redis.call('HSET', token_bucket_key, 'capacity', current_capacity)
end
-- Set expiration time to window size plus 5 seconds buffer
redis.call('PEXPIRE', sliding_window_key, window_size + 5000)
redis.call('PEXPIRE', token_bucket_key, window_size + 5000)
redis.call('PEXPIRE', token_encoder_key, window_size + 5000)
return {current_capacity, waiting_time, estimated, difference}
Token修正:
-- KEYS[1]: Sliding Window Key ("{shard-<hashtag>}:sliding-window:<redisRatelimitKey>")
-- KEYS[2]: Token Bucket Key ("{shard-<hashtag>}:token-bucket:<redisRatelimitKey>")
-- KEYS[3]: Token Encoder Key ("{shard-<hashtag>}:token-encoder:<provider>:<model>:<redisRatelimitKey>")
-- ARGV[1]: Estimated token consumption
-- ARGV[2]: Current timestamp (milliseconds)
-- ARGV[3]: Token bucket capacity
-- ARGV[4]: Window size (milliseconds)
-- ARGV[5]: Actual token consumption
-- ARGV[6]: Random string for sliding window value (length less than or equal to 255)
local MAX_SEARCH_ITRATIONS = 64
local function calculate_tokens_in_range(key, start_time, end_time)
local valid_list = redis.call('ZRANGEBYSCORE', key, start_time, end_time)
local valid_tokens = 0
for _, v in ipairs(valid_list) do
local _, tokens = struct.unpack('Bc0L', v)
valid_tokens = valid_tokens + tokens
end
return valid_tokens
end
local function binary_search_compensation_time(key, L, R, window_size, max_capacity, predicted_error)
local iter = 0
while L < R and iter < MAX_SEARCH_ITRATIONS do
iter = iter + 1
local mid = math.floor((L + R) / 2)
local valid_tokens = calculate_tokens_in_range(key, mid - window_size, mid)
if valid_tokens + predicted_error <= max_capacity then
R = mid
else
L = mid + 1
end
end
return L
end
local sliding_window_key = tostring(KEYS[1])
local token_bucket_key = tostring(KEYS[2])
local token_encoder_key = tostring(KEYS[3])
local estimated = tonumber(ARGV[1])
local current_timestamp = tonumber(ARGV[2])
local bucket_capacity = tonumber(ARGV[3])
local window_size = tonumber(ARGV[4])
local actual = tonumber(ARGV[5])
local random_string = tostring(ARGV[6])
-- Valid window start time
local window_start = current_timestamp - window_size
-- Get bucket
local bucket = redis.call('HMGET', token_bucket_key, 'capacity', 'max_capacity')
local current_capacity = tonumber(bucket[1])
local max_capacity = tonumber(bucket[2])
-- Initialize bucket manually if it doesn't exist
if not current_capacity then
current_capacity = bucket_capacity
max_capacity = bucket_capacity
redis.call('HMSET', token_bucket_key, 'capacity', bucket_capacity, 'max_capacity', bucket_capacity)
redis.call('ZADD', sliding_window_key, current_timestamp,
struct.pack('Bc0L', string.len(random_string), random_string, 0))
end
-- Calculate expired tokens
local released_tokens = calculate_tokens_in_range(sliding_window_key, 0, window_start)
if released_tokens > 0 then -- Expired tokens exist, attempt to replenish new tokens
-- Clean up expired data
redis.call('ZREMRANGEBYSCORE', sliding_window_key, 0, window_start)
-- Calculate valid tokens
local valid_tokens = calculate_tokens_in_range(sliding_window_key, '-inf', '+inf')
-- Update token count
if current_capacity + released_tokens > max_capacity then -- If current capacity plus released tokens exceeds max capacity, reset to max capacity minus valid tokens
current_capacity = max_capacity - valid_tokens
else -- Otherwise, directly add the released tokens
current_capacity = current_capacity + released_tokens
end
-- Immediately replenish new tokens
redis.call('HSET', token_bucket_key, 'capacity', current_capacity)
end
-- Update the difference from the token encoder
local difference = actual - estimated
redis.call('SET', token_encoder_key, difference)
-- Correction result for reservation
local correct_result = 0
if estimated < 0 or actual < 0 then
correct_result = 3 -- Invalid value
elseif estimated < actual then -- Underestimation
-- Mainly handle underestimation cases to properly limit actual usage; overestimation may reject requests but won't affect downstream services
-- Calculate prediction error
local predicted_error = math.abs(actual - estimated)
-- directly deduct all underestimated tokens
current_capacity = current_capacity - predicted_error
redis.call('HSET', token_bucket_key, 'capacity', current_capacity)
-- Get the latest valid timestamp
local last_valid_window = redis.call('ZRANGE', sliding_window_key, -1, -1, 'WITHSCORES')
local compensation_start = tonumber(last_valid_window[2])
if not compensation_start then -- Possibly all data just expired, use current timestamp minus window size as start
compensation_start = current_timestamp
end
while predicted_error ~= 0 do -- Distribute to future windows until all error is distributed
if max_capacity >= predicted_error then
local compensation_time = binary_search_compensation_time(sliding_window_key, compensation_start,
compensation_start + window_size, window_size, max_capacity, predicted_error)
if calculate_tokens_in_range(sliding_window_key, compensation_time - window_size, compensation_time) +
predicted_error > max_capacity then
correct_result = 1 -- If the compensation time exceeds max capacity, return 1 to indicate failure
break
end
redis.call('ZADD', sliding_window_key, compensation_time,
struct.pack('Bc0L', string.len(random_string), random_string, predicted_error))
predicted_error = 0
else
redis.call('ZADD', sliding_window_key, compensation_start,
struct.pack('Bc0L', string.len(random_string), random_string, max_capacity))
predicted_error = predicted_error - max_capacity
compensation_start = compensation_start + window_size
end
end
elseif estimated > actual then -- Overestimation
correct_result = 2
end
-- Set expiration time to window size plus 5 seconds buffer
redis.call('PEXPIRE', sliding_window_key, window_size + 5000)
redis.call('PEXPIRE', token_bucket_key, window_size + 5000)
redis.call('PEXPIRE', token_encoder_key, window_size + 5000)
return {correct_result}
LLM用量及限流响应信息
计划在请求成功后的响应header中,添加LLM用量及限流信息。响应结构体声明如下:
type ResponseHeader struct {
headers map[string]string
ErrorCode int32
ErrorMessage string
}
- 请求被拒绝时,返回以下内容
- headers
- 请求id(X-Sentinel-LLM-Token-Ratelimit-RequestID)
- token剩余信息(X-Sentinel-LLM-Token-Ratelimit-RemainingTokens)
- 等待时间(X-Sentinel-LLM-Token-Ratelimit-WaitingTime)
- 错误码(ErrorCode)
- 错误信息(ErrorMessage)
- headers
- 请求被接收时,返回以下内容
- headers
- 请求id(X-Sentinel-LLM-Token-Ratelimit-RequestID)
- token剩余信息(X-Sentinel-LLM-Token-Ratelimit-RemainingTokens)
- headers
其中错误码和错误信息从配置文件中获取。
用户接入
接入步骤
从用户角度,接入Sentinel提供的Token限流功能,需要以下几步:
-
准备Redis实例
-
对 Sentinel 的运行环境进行相关配置并初始化。
- 仅支持从yaml文件初始化
-
埋点(定义资源),固定
ResourceType=ResTypeCommon
且TrafficType=Inbound
的资源类型 -
根据下面的配置文件加载规则,规则配置项包括:资源名称、限流策略、具体规则项、redis配置、错误码、错误信息。如下是配置规则的示例,具体字段含义在下文的“配置文件描述”中有具体说明。
_, err = llmtokenratelimit.LoadRules([]*llmtokenratelimit.Rule{ { Resource: ".*", Strategy: llmtokenratelimit.FixedWindow, SpecificItems: []llmtokenratelimit.SpecificItem{ { Identifier: llmtokenratelimit.Identifier{ Type: llmtokenratelimit.Header, Value: ".*", }, KeyItems: []llmtokenratelimit.KeyItem{ { Key: ".*", Token: llmtokenratelimit.Token{ Number: 1000, CountStrategy: llmtokenratelimit.TotalTokens, }, Time: llmtokenratelimit.Time{ Unit: llmtokenratelimit.Second, Value: 60, }, }, }, }, }, }, })
-
可选:创建LLM实例嵌入到提供的适配器中即可
配置文件描述
总体规则配置
配置项 | 类型 | 必填 | 默认值 | 说明 |
---|---|---|---|---|
enabled | bool | 否 | false | 是否启用LLM Token限流功能,取值:false(不启用)、true(启用) |
rules | array of rule object | 否 | nil | 限流规则 |
redis | object | 否 | redis实例连接信息 | |
errorCode | int | 否 | 429 | 错误码,设置为0时会修改为429 |
errorMessage | string | 否 | "Too Many Requests" | 错误信息 |
rule配置
配置项 | 类型 | 必填 | 默认值 | 说明 |
---|---|---|---|---|
resource | string | 否 | ".*" | 规则资源名称,支持正则表达式,取值:".*"(全局匹配)、用户自定义正则表达式 |
strategy | string | 否 | "fixed-window" | 限流策略,取值:fixed-window(固定窗口)、peta(预测误差时序分摊) |
encoding | object | 否 | token编码方式,专用于peta限流策略 | |
specificItems | array of specificItem object | 是 | 具体规则项 |
encoding配置
配置项 | 类型 | 必填 | 默认值 | 说明 |
---|---|---|---|---|
provider | string | 否 | "openai" | 模型厂商 |
model | string | 否 | "gpt-4" | 模型名称 |
specificItem配置
配置项 | 类型 | 必填 | 默认值 | 说明 |
---|---|---|---|---|
identifier | object | 否 | 请求标识符 | |
keyItems | array of keyItem object | 是 | 规则匹配的键值信息 |
identifier配置
配置项 | 类型 | 必填 | 默认值 | 说明 |
---|---|---|---|---|
type | string | 否 | "all" | 请求标识符类型,取值:all(全局限流)、header |
value | string | 否 | ".*" | 请求标识符取值,支持正则表达式,取值:".*"(全局匹配)、用户自定义正则表达式 |
keyItem配置
配置项 | 类型 | 必填 | 默认值 | 说明 |
---|---|---|---|---|
key | string | 否 | ".*" | 具体规则项取值,支持正则表达式,取值:".*"(全局匹配)、用户自定义正则表达式 |
token | object | 是 | token数量和计算策略配置 | |
time | object | 是 | 时间单位和周期配置 |
token配置
配置项 | 类型 | 必填 | 默认值 | 说明 |
---|---|---|---|---|
number | int | 是 | token数量 | |
countStrategy | string | 否 | "total-tokens" | token计算策略,取值:input-tokens、output-tokens、total-tokens |
time配置
配置项 | 类型 | 必填 | 默认值 | 说明 |
---|---|---|---|---|
unit | string | 是 | 时间单位,取值:second、minute、hour、day | |
value | int | 是 | 时间值 |
redis配置
配置项 | 类型 | 必填 | 默认值 | 说明 |
---|---|---|---|---|
addrs | array of addr object | 否 | [{name: "127.0.0.1", port: 6379}] | redis节点服务,见注意事项说明 |
username | string | 否 | 空字符串 | redis用户名 |
password | string | 否 | 空字符串 | redis密码 |
dialTimeout | int | 否 | 0 | 建立redis连接的最长等待时间,单位:毫秒 |
readTimeout | int | 否 | 0 | 等待Redis服务器响应的最长时间,单位:毫秒 |
writeTimeout | int | 否 | 0 | 向网络连接发送命令数据的最长时间,单位:毫秒 |
poolTimeout | int | 否 | 0 | 从连接池获取一个空闲连接的最大等待时间,单位:毫秒 |
poolSize | int | 否 | 10 | 连接池中的连接数量 |
minIdleConns | int | 否 | 5 | 连接池闲置连接的最少数量 |
maxRetries | int | 否 | 3 | 操作失败,最大尝试次数 |
addr配置
配置项 | 类型 | 必填 | 默认值 | 说明 |
---|---|---|---|---|
name | string | 否 | "127.0.0.1" | redis节点服务名称,带服务类型的完整 FQDN 名称,例如 my-redis.dns、redis.my-ns.svc.cluster.local |
port | int | 否 | 6379 | redis节点服务端口 |
总体配置文件示例
version: "v1"
sentinel:
app:
name: sentinel-go-demo
log:
metric:
maxFileCount: 7
llmTokenRatelimit:
enabled: true,
rules:
- resource: ".*"
strategy: "fixed-window"
specificItems:
- identifier:
type: "header"
value: ".*"
keyItems:
- key: ".*"
token:
number: 1000
countStrategy: "total-tokens"
time:
unit: "second"
value: 60
errorCode: 429
errorMessage: "Too Many Requests"
redis:
addrs:
- name: "127.0.0.1"
port: 6379
username: "redis"
password: "redis"
dialTimeout: 5000
readTimeout: 5000
writeTimeout: 5000
poolTimeout: 5000
poolSize: 10
minIdleConns: 5
maxRetries: 3
LLM框架适配
目前支持Langchaingo和Eino框架无侵入式接入Sentinel提供的Token限流能力,主要适用于文本生成方面,使用方法详见
- pkg/adapters/langchaingo/wrapper.go
- pkg/adapters/eino/wrapper.go
注意事项
- PETA使用tiktoken预估输入消耗token数,但是需要下载或预先配置
字节对编码(Byte Pair Encoding,BPE)
字典- 在线模式
- 首次使用时,tiktoken需要联网下载编码文件
- 离线模式
- 预先准备缓存好的tiktoken的编码文件(非直接下载文件,而是经过tiktoken处理后的文件),并通过配置TIKTOKEN_CACHE_DIR环境变量指定文件目录位置
- 在线模式
- 规则去重说明
- keyItems中,若仅number不同,会去重保留最新的number
- specificItems中,仅保留去重后的keyItems
- resource中,仅保留最新的resource
- redis配置说明
- 若连接的redis是集群模式,那么addrs里面的地址数量必须大于等于2个,否则会默认进入redis单点模式,导致限流失效