Skip to content

[Feature] 添加LLM Token限流功能 | Add LLM Token Rate Limit #596

@dancing-ui

Description

@dancing-ui

Issue Description

Type: feature request

Describe what feature you want

背景

Sentinel 作为面向分布式应用场景、多语言异构化服务架构的流量治理组件,以丰富的流量防护能力满足了各种应用场景的限流需求。当下 AI 应用成为广大开发者关注的领域,但是想要将 AI 应用真正用于生产,高可用的能力是必不可少的,由此也出现了很多 AI 应用场景下新的流量防护需求,例如 Token 限流,Token 这个 AI 场景下的常用单位,在作为限流的统计维度时存在着限流时机与统计时机不一致,强需求集群限流等特点。

Token限流现状

AI网关Token限流能力

现在已经有多个AI网关(如Kong、Higress等)以插件的方式实现了LLM Token限流能力,限流算法大多使用固定窗口或滑动窗口,限流过程与传统限流的区别在于:限流信息的更新依赖于下游服务的调用结果

竞品分析

项目名称 Kong Higress APISIX Envoy AI Gateway
简介 企业级 API 网关,提供商业化 AI 限流能力 开源云原生网关,源自阿里云实践 轻量级开源 API 网关 基于 Envoy 的可扩展 AI 网关
Token 维度 总/输入/输出/自定义成本 Token 仅总 Token 输入/输出/总 Token CEL 表达式
存储策略 内存/内置 DB/Redis Redis 强依赖 本地 LRU 缓存 内存/Redis
请求维度 IP/凭证/消费者/服务/请求头等 IP/参数/请求头/Consumer 仅模型名称 请求头/URL 参数/IP
限流算法 固定+滑动窗口(秒级) 固定窗口(秒至天) 固定窗口(秒级) Envoy 原生限流
响应头部 丰富 X-AI-RateLimit-* 信息 基础限流头部 基础限流头部
核心优势 多维度控制;成本限流;数据一致性 长周期限流;开源免费 轻量部署;实时反馈 CEL 灵活定制;存储可选
主要不足 闭源付费;无降级策略 无输入输出区分;强依赖 Redis;无降级策略 无集群同步;扩展性差;无降级策略 配置复杂;无响应头部;无降级策略
开源情况 企业版闭源 开源 开源 开源

计划功能点对比Image

考虑到LLM应用的强集群需求,所以本issue不实现单机模式

总结

尽管当前AI网关实现的Token限流能力已经非常多样化,但是仍存在功能点分散、流量临界、无法应对突发流量等问题,无法满足LLM开发框架(如LangChainGoEino等)实际需要的Token限流需求。

因此,本issue希望在 Sentinel 中实现基础的 Token 限流能力,并在此基础上,提供多样化的限流策略,限制 AI 应用的 Token 消耗,解决流量临界问题,应对突发流量,保障 AI 应用的整体稳定性。

预期提供的API

本issue预期提供Token限流相关的功能点如下:

  • 核心功能:集群模式下的基础Token限流能力
  • 实现1~2个LLM开发框架的适配器
  • 扩展功能:预测误差时序分摊(Predictive Error Temporal Amortized,PETA)、LLM用量及限流响应信息

详细设计

总体流程图Image

LLM应用实例主要是指以LangChainGo为代表的LLM应用框架,计划Sentinel以包装函数的方式适配到LLM开发框架中,对模型调用Token进行限流。

基础Token限流

总体时序图Image
功能点列表

基础Token限流方案将包含下列功能点:

  • 支持集群模式
  • 支持多请求维度(优先支持header维度)
  • 支持多时间维度
  • 适配1~2个LLM开发框架
  • 限流算法:固定窗口限流
初始化

在Token限流中,Sentinel需要读取配置文件完成以下初始化工作:

  • 连接Redis
  • 初始化Token限流规则(支持配置文件初始化、提供LoadRules API动态加载)
Entry设计

Sentinel使用Entry函数将业务逻辑封装起来,这一过程称为“埋点”,每个埋点都关联一个资源名称,它标识了触发该资源的调用或访问。函数原型如下:

func Entry(resource string, opts ...Option) (*base.SentinelEntry, *base.BlockError)

在微服务场景下,调用Entry函数时,传入的resource一般是作为接口路径或名称。

一般来说,由于Token限流总是作为网关接收外部请求,Token限流方案的流量类型TrafficType固定为Inbound,其中Inbound代表入口流量,Outbound代表出口流量。实际上,集群模式下的Token限流用不到TrafficType字段。

下面是Entry函数调用示例:

// Wrapper request
reqInfos := llmtokenratelimit.GenerateRequestInfos(
    llmtokenratelimit.WithHeader(header),
    llmtokenratelimit.WithPrompts(prompts),
)
// Check
e, b := sentinel.Entry(resource, sentinel.WithTrafficType(base.Inbound), sentinel.WithArgs(reqInfos))
if b != nil {
    // Blocked
    println("Blocked")
    continue
}
// Pass
println("Pass")
// Simulate llm service
time.Sleep(llmRunningTime)
// Update used token info
entry.SetPair(llmtokenratelimit.KeyUsedTokenInfos, usedTokenInfos)
// Must be executed immediately after the SetPair function
e.Exit()
请求信息包装

为了规范化传入Entry的请求参数信息,计划向用户提供一个参数结构体,用户需将本次请求的相关信息填写到结构体里面,再通过WithArgs方法传入到Entry中。

type RequestInfos struct {
	Headers map[string][]string `json:"headers"`
	Prompts []string            `json:"prompts"`
}
LLM Token消耗信息包装

为了规范化LLM Token消耗信息,计划向用户提供一个参数结构体,用户需将本次LLM请求Token消耗的相关信息填写到结构体里面,再通过提供的API方法更新Token信息。

type UsedTokenInfos struct {
	InputTokens  int64 `json:"inputTokens"`
	OutputTokens int64 `json:"outputTokens"`
	TotalTokens  int64 `json:"totalTokens"`
}

除此之外,为方便使用,计划以模型厂商维度提供一个包装函数,便于用户使用,如下为包装OpenAI token消耗的辅助函数:

func OpenAITokenExtractor(response interface{}) *UsedTokenInfos {
	if response == nil {
		return nil
	}

	resp, ok := response.(map[string]any)
	if !ok {
		return nil
	}

	inputTokens, ok := resp["prompt_tokens"].(int)
	if !ok {
		return nil
	}
	outputTokens, ok := resp["completion_tokens"].(int)
	if !ok {
		return nil
	}
	totalTokens, ok := resp["total_tokens"].(int)
	if !ok {
		return nil
	}

	return GenerateUsedTokenInfos(
		WithInputTokens(inputTokens),
		WithOutputTokens(outputTokens),
		WithTotalTokens(totalTokens),
	)
}
限流算法

下面描述如何使用固定窗口算法实现Token限流的完整过程。

对于用户的每个LLM调用请求,都需要先经过Sentinel的Entry函数,该函数使用责任链模式依次执行提前注册好的限流、熔断等组件初始化、规则检查、调用统计功能(称其为StatPrepareSlot、RuleCheckSlot、StatSlot)。所以,Token限流也需要以同样的方式实现这三类方法。

  • StatPrepareSlot:Token限流不需要在该方法中完成任何事情。
  • RuleCheckSlot:限流规则检查采用分层匹配设计。首先该方法会读取Entry传入的resource,然后将resource与所有已配置的限流规则(配置文件中对应的是resource字段)进行正则匹配。针对每条匹配命中的规则,将会提取每条具体规则项(配置文件中对应的是specificItems字段)与当前请求的相关信息进行具体规则项正则匹配,若匹配未命中,则跳过;否则,将规则项信息通过固定的格式组成Redis Key,然后利用Lua脚本原子读取Redis中剩余的Token数量,若剩余Token数量大于等于0,则放行请求,返回True;否则拒绝请求,返回False。Lua脚本实现如下:
-- KEYS[1]: Fixed Window Key ("<redisRatelimitKey>")

-- ARGV[1]: Maximum Token capacity
-- ARGV[2]: Window size (milliseconds)

local fixed_window_key = KEYS[1]

local max_token_capacity = tonumber(ARGV[1])
local window_size = tonumber(ARGV[2])

local ttl = redis.call('PTTL', fixed_window_key)
if ttl < 0 then
    redis.call('SET', fixed_window_key, max_token_capacity, 'PX', window_size)
    return {max_token_capacity, window_size}
end
return {redis.call('GET', fixed_window_key), ttl}
  • StatSlot:放行请求后,读取实际消耗的Token数,通过SentinelEntry.SetPair方法记录Token消耗,然后在OnCompleted方法中依次遍历命中的具体规则项,根据当前具体规则项的Token计算策略,计算本次消耗的Token数,最后利用Lua脚本和decrby命令原子更新Redis中剩余的Token数量(细节:SetPair方法必须在SentinelEntry.Exit方法前执行,且Exit必须被立即执行,否则会更新失败)。Lua脚本实现如下:
-- KEYS[1]: Fixed Window Key ("<redisRatelimitKey>")

-- ARGV[1]: Maximum Token capacity
-- ARGV[2]: Window size (milliseconds)
-- ARGV[3]: Actual token consumption

local fixed_window_key = KEYS[1]

local max_token_capacity = tonumber(ARGV[1])
local window_size = tonumber(ARGV[2])
local actual = tonumber(ARGV[3])

local ttl = redis.call('PTTL', fixed_window_key)
if ttl < 0 then
    redis.call('SET', fixed_window_key, max_token_capacity-actual, 'PX', window_size)
    return {max_token_capacity-actual, window_size}
end
return {redis.call('DECRBY', fixed_window_key, actual), ttl}
限流规则检查补充例子说明

现在有规则配置如下:

  • 接口前缀为/a/,检查请求中所有header键的前缀是X-CA-、值的前缀是123的键值对在60秒内的总token不超过900,下简称该规则配置为rules-A,对应配置文件如下:
- resource: "/a/*"
  specificItems:
    - identifier:
        type: header
        value: "X-CA-*"
      keyItems:
        - key: "123*"
          token: 
            number: 900
            countStrategy: "total-tokens"
          time:
            unit: second
            value: 60

现在,假设某个请求的接口路径或名称为/a/b,该请求包含了1个header键值对{X-CA-A:123},将/a/b作为resource传入Entry,首先会正则匹配到rules-A.resource/a/*,再往下X-CA-A二次正则匹配到了rules-A.specificItems[0].identifier.valueX-CA-*,再往下123三次正则匹配到了rules-A.specificItems[0].keyItems[0].key123*

到这里为止,我们就认为该请求命中了该具体规则项,对应RedisKey=sentinel-go:llm-token-ratelimit:resource-<hashedResource>:fixed-window:header:60:total-tokens,初始Value=900,由于Value>=0,请求放行。

接着,假设该请求消耗了500 input-tokens500 output-tokens,那么total-tokens=1000,更新Value=-100(允许Value为负)。

再然后,假设又来一个请求的接口名称为/a/c,该请求同样包含了1个header键值对{X-CA-B:123456},依然能够匹配到rules-A,对应RedisKey=sentinel-go:llm-token-ratelimit:resource-<hashedResource>:fixed-window:header:60:total-tokens,但此时Value=-100,由于Value<0,所以拒绝该请求。

综上,我们通过将Rule中的所有统计指标都作为Redis Key的一部分,能够区分同类请求并进行限流。

  • 注:hashedResource = murmur3(resource)
LLM框架适配
获取消耗的Token数

获取Completion方法消耗的Token数,可读取llms.ContentChoice中的GenerationInfo字段,得到消耗的输入、输出、推理(可选)、总Token数。该字段是llm.GenerateContent方法的返回值。

经调研,LangChainGo的embeddings.CreateEmbedding接口返回结果不包含Token消耗结果,以langchaingo/llms/openai/internal/openaiclient/openaiclient.go中实现的CreateEmbedding接口为例,该方法仅返回 [][]float32 作为词向量结果,且其中调用的createEmbedding属于私有方法,不好包装。综上所述,本issue不考虑支持Embedding方法

框架适配

为了LLM框架更方便使用Sentinel提供Token限流能力,计划对部分知名LLM框架提供适配器。

由于未发现LangChainGo存在函数注入的Middleware接口,且LangChainGo提供的回调方法并不专用于GenerateContent方法。综合考虑,采取包装函数方式,传入大模型实例,包装llms.Model接口的GenerateContent、Call(Deprecated)这类文本补全、生成方法

下面以GenerateContent为例,说明如何包装和调用。

func (t *LLMWrapper) GenerateContent(
	ctx context.Context,
	messages []langchaingo.MessageContent,
	options ...langchaingo.CallOption,
) (*langchaingo.ContentResponse, error) {
    // 1.初始化
	// 2. Sentinel限流检查
	// 3. 正常调用模型
	response, err := t.llm.GenerateContent(ctx, messages, options...)
	if err != nil {
		return nil, err
	} 
	// 4. 根据调用结果更新Token数量
    // 5. 返回模型结果
	return response, nil
}

扩展功能项

预测误差时序分摊(Predictive Error Temporal Amortized,PETA)
总体时序图Image
算法描述

为了解决基础Token限流中面临的Token统计滞后性导致的非法请求错放问题,计划提出PETA,改善错放情况。

  • 所谓时序分摊是指将低估的token分摊到未来的窗口中,进而影响后续请求
  • 该算法结合了滑动窗口和类似于令牌桶(没有生成速率,而是回收过期窗口)的方法

基本过程:Token预扣使用外部库(tiktoken-go)计算出可能消耗的Token数量,然后提前更新Redis限流信息,等到实际调用后,再修正结果。

  • 可以影响到 input-tokens、total-tokens
    • 由于total-tokens依赖output-tokens,随机性大,预测效果差,不建议对total-tokens使用该策略
  • output-tokens暂时无法预测,所以影响不到,预估token默认是0
  • 注意:即使命中input-tokens规则,也并不会同时命中total-tokens规则

token计算策略:使用tiktoken-go计算初始输入内容预计消耗token数量;为了能够适应token消耗变化,需要每次Set更新预测值和真实值的差值

  • 使用redis存储差值
    • key格式:"<redisRatelimitKey>:token-encoder:<model-provider>:<model-name>"
      • redisRatelimitKey格式:"sentinel-go:llm-token-ratelimit:<ruleName>:<strategy>:<identifierType>:<timeWindow>:<tokenCountStrategy>"
    • 过期时间与限流策略一致
  • 总结token预测计算策略如下
    • estimatedToken = tiktoken(raw_contents)+query_difference(redis_key)
    • 若estimatedToken 结果为负数,则重置difference为0,estimatedToken = tiktoken(raw_contents)

Token预扣:

-- KEYS[1]: Sliding Window Key ("{shard-<hashtag>}:sliding-window:<redisRatelimitKey>")
-- KEYS[2]: Token Bucket Key ("{shard-<hashtag>}:token-bucket:<redisRatelimitKey>")
-- KEYS[3]: Token Encoder Key ("{shard-<hashtag>}:token-encoder:<provider>:<model>:<redisRatelimitKey>")
-- ARGV[1]: Estimated token consumption
-- ARGV[2]: Current timestamp (milliseconds)
-- ARGV[3]: Token bucket capacity
-- ARGV[4]: Window size (milliseconds)
-- ARGV[5]: Random string for sliding window unique value (length less than or equal to 255)
local function calculate_tokens_in_range(key, start_time, end_time)
    local valid_list = redis.call('ZRANGEBYSCORE', key, start_time, end_time)
    local valid_tokens = 0
    for _, v in ipairs(valid_list) do
        local _, tokens = struct.unpack('Bc0L', v)
        valid_tokens = valid_tokens + tokens
    end
    return valid_tokens
end

local sliding_window_key = tostring(KEYS[1])
local token_bucket_key = tostring(KEYS[2])
local token_encoder_key = tostring(KEYS[3])

local estimated = tonumber(ARGV[1])
local current_timestamp = tonumber(ARGV[2])
local bucket_capacity = tonumber(ARGV[3])
local window_size = tonumber(ARGV[4])
local random_string = tostring(ARGV[5])

-- Valid window start time
local window_start = current_timestamp - window_size
-- Waiting time
local waiting_time = 0
-- Get bucket
local bucket = redis.call('HMGET', token_bucket_key, 'capacity', 'max_capacity')
local current_capacity = tonumber(bucket[1])
local max_capacity = tonumber(bucket[2])
-- Initialize bucket manually if it doesn't exist
if not current_capacity then
    current_capacity = bucket_capacity
    max_capacity = bucket_capacity
    redis.call('HMSET', token_bucket_key, 'capacity', bucket_capacity, 'max_capacity', bucket_capacity)
    redis.call('ZADD', sliding_window_key, current_timestamp,
        struct.pack('Bc0L', string.len(random_string), random_string, 0))
end
-- Calculate expired tokens
local released_tokens = calculate_tokens_in_range(sliding_window_key, 0, window_start)
if released_tokens > 0 then -- Expired tokens exist, attempt to replenish new tokens
    -- Clean up expired data
    redis.call('ZREMRANGEBYSCORE', sliding_window_key, 0, window_start)
    -- Calculate valid tokens
    local valid_tokens = calculate_tokens_in_range(sliding_window_key, '-inf', '+inf')
    -- Update token count
    if current_capacity + released_tokens > max_capacity then -- If current capacity plus released tokens exceeds max capacity, reset to max capacity minus valid tokens
        current_capacity = max_capacity - valid_tokens
    else -- Otherwise, directly add the released tokens
        current_capacity = current_capacity + released_tokens
    end
    -- Immediately replenish new tokens
    redis.call('HSET', token_bucket_key, 'capacity', current_capacity)
end
-- Plus the difference from the token encoder if it exists
local ttl = redis.call('PTTL', token_encoder_key)
local difference = tonumber(redis.call('GET', token_encoder_key))
if ttl < 0 then
    difference = 0
else
    if difference + estimated >= 0 then
        estimated = estimated + difference
    else
        redis.call('SET', token_encoder_key, 0)
    end
end
-- Check if the request can be satisfied
if max_capacity < estimated or estimated <= 0 then -- If max capacity is less than estimated consumption or estimated is less than or equal to 0, return -1 indicating rejection
    waiting_time = -1
elseif current_capacity < estimated then -- If current capacity is insufficient to satisfy estimated consumption, calculate waiting time
    -- Get the earliest valid timestamp
    local first_valid_window = redis.call('ZRANGE', sliding_window_key, 0, 0, 'WITHSCORES')
    local first_valid_start = tonumber(first_valid_window[2])
    if not first_valid_start then
        first_valid_start = current_timestamp
    end
    -- Waiting time = fixed delay + window size - valid window interval
    waiting_time = 3 + window_size - (current_timestamp - first_valid_start)
else -- Otherwise, capacity satisfies estimated consumption, no waiting required, update data
    redis.call('ZADD', sliding_window_key, current_timestamp,
        struct.pack('Bc0L', string.len(random_string), random_string, estimated))
    current_capacity = current_capacity - estimated
    redis.call('HSET', token_bucket_key, 'capacity', current_capacity)
end

-- Set expiration time to window size plus 5 seconds buffer
redis.call('PEXPIRE', sliding_window_key, window_size + 5000)
redis.call('PEXPIRE', token_bucket_key, window_size + 5000)
redis.call('PEXPIRE', token_encoder_key, window_size + 5000)

return {current_capacity, waiting_time, estimated, difference}

Token修正:

-- KEYS[1]: Sliding Window Key ("{shard-<hashtag>}:sliding-window:<redisRatelimitKey>")
-- KEYS[2]: Token Bucket Key ("{shard-<hashtag>}:token-bucket:<redisRatelimitKey>")
-- KEYS[3]: Token Encoder Key ("{shard-<hashtag>}:token-encoder:<provider>:<model>:<redisRatelimitKey>")
-- ARGV[1]: Estimated token consumption
-- ARGV[2]: Current timestamp (milliseconds)
-- ARGV[3]: Token bucket capacity
-- ARGV[4]: Window size (milliseconds)
-- ARGV[5]: Actual token consumption
-- ARGV[6]: Random string for sliding window value (length less than or equal to 255)
local MAX_SEARCH_ITRATIONS = 64

local function calculate_tokens_in_range(key, start_time, end_time)
    local valid_list = redis.call('ZRANGEBYSCORE', key, start_time, end_time)
    local valid_tokens = 0
    for _, v in ipairs(valid_list) do
        local _, tokens = struct.unpack('Bc0L', v)
        valid_tokens = valid_tokens + tokens
    end
    return valid_tokens
end

local function binary_search_compensation_time(key, L, R, window_size, max_capacity, predicted_error)
    local iter = 0
    while L < R and iter < MAX_SEARCH_ITRATIONS do
        iter = iter + 1
        local mid = math.floor((L + R) / 2)
        local valid_tokens = calculate_tokens_in_range(key, mid - window_size, mid)
        if valid_tokens + predicted_error <= max_capacity then
            R = mid
        else
            L = mid + 1
        end
    end
    return L
end

local sliding_window_key = tostring(KEYS[1])
local token_bucket_key = tostring(KEYS[2])
local token_encoder_key = tostring(KEYS[3])

local estimated = tonumber(ARGV[1])
local current_timestamp = tonumber(ARGV[2])
local bucket_capacity = tonumber(ARGV[3])
local window_size = tonumber(ARGV[4])
local actual = tonumber(ARGV[5])
local random_string = tostring(ARGV[6])

-- Valid window start time
local window_start = current_timestamp - window_size
-- Get bucket
local bucket = redis.call('HMGET', token_bucket_key, 'capacity', 'max_capacity')
local current_capacity = tonumber(bucket[1])
local max_capacity = tonumber(bucket[2])
-- Initialize bucket manually if it doesn't exist
if not current_capacity then
    current_capacity = bucket_capacity
    max_capacity = bucket_capacity
    redis.call('HMSET', token_bucket_key, 'capacity', bucket_capacity, 'max_capacity', bucket_capacity)
    redis.call('ZADD', sliding_window_key, current_timestamp,
        struct.pack('Bc0L', string.len(random_string), random_string, 0))
end
-- Calculate expired tokens
local released_tokens = calculate_tokens_in_range(sliding_window_key, 0, window_start)
if released_tokens > 0 then -- Expired tokens exist, attempt to replenish new tokens
    -- Clean up expired data
    redis.call('ZREMRANGEBYSCORE', sliding_window_key, 0, window_start)
    -- Calculate valid tokens
    local valid_tokens = calculate_tokens_in_range(sliding_window_key, '-inf', '+inf')
    -- Update token count
    if current_capacity + released_tokens > max_capacity then -- If current capacity plus released tokens exceeds max capacity, reset to max capacity minus valid tokens
        current_capacity = max_capacity - valid_tokens
    else -- Otherwise, directly add the released tokens
        current_capacity = current_capacity + released_tokens
    end
    -- Immediately replenish new tokens
    redis.call('HSET', token_bucket_key, 'capacity', current_capacity)
end
-- Update the difference from the token encoder
local difference = actual - estimated
redis.call('SET', token_encoder_key, difference)
-- Correction result for reservation
local correct_result = 0
if estimated < 0 or actual < 0 then
    correct_result = 3 -- Invalid value
elseif estimated < actual then -- Underestimation
    -- Mainly handle underestimation cases to properly limit actual usage; overestimation may reject requests but won't affect downstream services
    -- Calculate prediction error
    local predicted_error = math.abs(actual - estimated)
    -- directly deduct all underestimated tokens
    current_capacity = current_capacity - predicted_error
    redis.call('HSET', token_bucket_key, 'capacity', current_capacity)
    -- Get the latest valid timestamp
    local last_valid_window = redis.call('ZRANGE', sliding_window_key, -1, -1, 'WITHSCORES')
    local compensation_start = tonumber(last_valid_window[2])
    if not compensation_start then -- Possibly all data just expired, use current timestamp minus window size as start
        compensation_start = current_timestamp
    end
    while predicted_error ~= 0 do -- Distribute to future windows until all error is distributed
        if max_capacity >= predicted_error then
            local compensation_time = binary_search_compensation_time(sliding_window_key, compensation_start,
                compensation_start + window_size, window_size, max_capacity, predicted_error)
            if calculate_tokens_in_range(sliding_window_key, compensation_time - window_size, compensation_time) +
                predicted_error > max_capacity then
                correct_result = 1 -- If the compensation time exceeds max capacity, return 1 to indicate failure
                break
            end
            redis.call('ZADD', sliding_window_key, compensation_time,
                struct.pack('Bc0L', string.len(random_string), random_string, predicted_error))
            predicted_error = 0
        else
            redis.call('ZADD', sliding_window_key, compensation_start,
                struct.pack('Bc0L', string.len(random_string), random_string, max_capacity))
            predicted_error = predicted_error - max_capacity
            compensation_start = compensation_start + window_size
        end
    end
elseif estimated > actual then -- Overestimation
    correct_result = 2
end

-- Set expiration time to window size plus 5 seconds buffer
redis.call('PEXPIRE', sliding_window_key, window_size + 5000)
redis.call('PEXPIRE', token_bucket_key, window_size + 5000)
redis.call('PEXPIRE', token_encoder_key, window_size + 5000)

return {correct_result}
LLM用量及限流响应信息

计划在请求成功后的响应header中,添加LLM用量及限流信息。响应结构体声明如下:

type ResponseHeader struct {
	headers      map[string]string
	ErrorCode    int32
	ErrorMessage string
}
  • 请求被拒绝时,返回以下内容
    • headers
      • 请求id(X-Sentinel-LLM-Token-Ratelimit-RequestID)
      • token剩余信息(X-Sentinel-LLM-Token-Ratelimit-RemainingTokens)
      • 等待时间(X-Sentinel-LLM-Token-Ratelimit-WaitingTime)
    • 错误码(ErrorCode)
    • 错误信息(ErrorMessage)
  • 请求被接收时,返回以下内容
    • headers
      • 请求id(X-Sentinel-LLM-Token-Ratelimit-RequestID)
      • token剩余信息(X-Sentinel-LLM-Token-Ratelimit-RemainingTokens)

其中错误码和错误信息从配置文件中获取。

用户接入

接入步骤

从用户角度,接入Sentinel提供的Token限流功能,需要以下几步:

  1. 准备Redis实例

  2. 对 Sentinel 的运行环境进行相关配置并初始化。

    1. 仅支持从yaml文件初始化
  3. 埋点(定义资源),固定ResourceType=ResTypeCommonTrafficType=Inbound的资源类型

  4. 根据下面的配置文件加载规则,规则配置项包括:资源名称、限流策略、具体规则项、redis配置、错误码、错误信息。如下是配置规则的示例,具体字段含义在下文的“配置文件描述”中有具体说明。

    _, err = llmtokenratelimit.LoadRules([]*llmtokenratelimit.Rule{
        {
    
            Resource: ".*",
            Strategy: llmtokenratelimit.FixedWindow,
            SpecificItems: []llmtokenratelimit.SpecificItem{
                {
                    Identifier: llmtokenratelimit.Identifier{
                        Type:  llmtokenratelimit.Header,
                        Value: ".*",
                    },
                    KeyItems: []llmtokenratelimit.KeyItem{
                        {
                            Key:      ".*",
                            Token: llmtokenratelimit.Token{
                                Number:        1000,
                                CountStrategy: llmtokenratelimit.TotalTokens,
                            },
                            Time: llmtokenratelimit.Time{
                                Unit:  llmtokenratelimit.Second,
                                Value: 60,
                            },
                        },
                    },
                },
            },
        },
    })
  5. 可选:创建LLM实例嵌入到提供的适配器中即可

配置文件描述

总体规则配置

配置项 类型 必填 默认值 说明
enabled bool false 是否启用LLM Token限流功能,取值:false(不启用)、true(启用)
rules array of rule object nil 限流规则
redis object redis实例连接信息
errorCode int 429 错误码,设置为0时会修改为429
errorMessage string "Too Many Requests" 错误信息

rule配置

配置项 类型 必填 默认值 说明
resource string ".*" 规则资源名称,支持正则表达式,取值:".*"(全局匹配)、用户自定义正则表达式
strategy string "fixed-window" 限流策略,取值:fixed-window(固定窗口)、peta(预测误差时序分摊)
encoding object token编码方式,专用于peta限流策略
specificItems array of specificItem object 具体规则项

encoding配置

配置项 类型 必填 默认值 说明
provider string "openai" 模型厂商
model string "gpt-4" 模型名称

specificItem配置

配置项 类型 必填 默认值 说明
identifier object 请求标识符
keyItems array of keyItem object 规则匹配的键值信息

identifier配置

配置项 类型 必填 默认值 说明
type string "all" 请求标识符类型,取值:all(全局限流)、header
value string ".*" 请求标识符取值,支持正则表达式,取值:".*"(全局匹配)、用户自定义正则表达式

keyItem配置

配置项 类型 必填 默认值 说明
key string ".*" 具体规则项取值,支持正则表达式,取值:".*"(全局匹配)、用户自定义正则表达式
token object token数量和计算策略配置
time object 时间单位和周期配置

token配置

配置项 类型 必填 默认值 说明
number int token数量
countStrategy string "total-tokens" token计算策略,取值:input-tokens、output-tokens、total-tokens

time配置

配置项 类型 必填 默认值 说明
unit string 时间单位,取值:second、minute、hour、day
value int 时间值

redis配置

配置项 类型 必填 默认值 说明
addrs array of addr object [{name: "127.0.0.1", port: 6379}] redis节点服务,见注意事项说明
username string 空字符串 redis用户名
password string 空字符串 redis密码
dialTimeout int 0 建立redis连接的最长等待时间,单位:毫秒
readTimeout int 0 等待Redis服务器响应的最长时间,单位:毫秒
writeTimeout int 0 向网络连接发送命令数据的最长时间,单位:毫秒
poolTimeout int 0 从连接池获取一个空闲连接的最大等待时间,单位:毫秒
poolSize int 10 连接池中的连接数量
minIdleConns int 5 连接池闲置连接的最少数量
maxRetries int 3 操作失败,最大尝试次数

addr配置

配置项 类型 必填 默认值 说明
name string "127.0.0.1" redis节点服务名称,带服务类型的完整 FQDN 名称,例如 my-redis.dns、redis.my-ns.svc.cluster.local
port int 6379 redis节点服务端口

总体配置文件示例

version: "v1"
sentinel:
  app:
    name: sentinel-go-demo
  log:
    metric:
      maxFileCount: 7
  llmTokenRatelimit:
  	enabled: true,
    rules:
      - resource: ".*"
        strategy: "fixed-window"
        specificItems:
          - identifier:
              type: "header"
              value: ".*"
            keyItems:
              - key: ".*"
                token: 
                  number: 1000
                  countStrategy: "total-tokens"
                time:
                  unit: "second"
                  value: 60

    errorCode: 429
    errorMessage: "Too Many Requests"
    
    redis:
      addrs:
        - name: "127.0.0.1"
          port: 6379
      username: "redis"
      password: "redis"
      dialTimeout: 5000
      readTimeout: 5000
      writeTimeout: 5000
      poolTimeout: 5000
      poolSize: 10
      minIdleConns: 5
      maxRetries: 3

LLM框架适配

目前支持Langchaingo和Eino框架无侵入式接入Sentinel提供的Token限流能力,主要适用于文本生成方面,使用方法详见

  • pkg/adapters/langchaingo/wrapper.go
  • pkg/adapters/eino/wrapper.go

注意事项

  • PETA使用tiktoken预估输入消耗token数,但是需要下载或预先配置字节对编码(Byte Pair Encoding,BPE)字典
    • 在线模式
      • 首次使用时,tiktoken需要联网下载编码文件
    • 离线模式
      • 预先准备缓存好的tiktoken的编码文件(非直接下载文件,而是经过tiktoken处理后的文件),并通过配置TIKTOKEN_CACHE_DIR环境变量指定文件目录位置
  • 规则去重说明
    • keyItems中,若仅number不同,会去重保留最新的number
    • specificItems中,仅保留去重后的keyItems
    • resource中,仅保留最新的resource
  • redis配置说明
    • 若连接的redis是集群模式,那么addrs里面的地址数量必须大于等于2个,否则会默认进入redis单点模式,导致限流失效

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions