Skip to content

Commit a7c62f1

Browse files
authored
Merge pull request #8 from lxzan/dev
use time cache
2 parents d087ce1 + 7ea4932 commit a7c62f1

File tree

9 files changed

+759
-437
lines changed

9 files changed

+759
-437
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@
33
vendor/
44
examples/
55
bin/
6-
go.work
6+
go.work
7+
.DS_Store

.golangci.yaml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
linters:
2+
enable-all: true
3+
# Disable specific linter
4+
# https://golangci-lint.run/usage/linters/#disabled-by-default
5+
disable:
6+
- testpackage
7+
- nosnakecase
8+
- nlreturn
9+
- gomnd
10+
- forcetypeassert
11+
- wsl
12+
- whitespace
13+
- prealloc
14+
- ineffassign
15+
- lll
16+
- funlen
17+
- scopelint
18+
- dupl
19+
- gofumpt
20+
- gofmt
21+
- godot
22+
- gci
23+
- goimports
24+
- gocognit
25+
- ifshort
26+
- gochecknoinits
27+
- predeclared
28+
- containedctx
29+
# Enable presets.
30+
# https://golangci-lint.run/usage/linters
31+
# Run only fast linters from enabled linters set (first run won't be fast)
32+
# Default: false
33+
fast: true

benchmark/benchmark_test.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package benchmark
22

33
import (
4-
"github.com/dgraph-io/ristretto"
5-
"github.com/lxzan/memorycache"
6-
"github.com/lxzan/memorycache/internal/utils"
74
"sync/atomic"
85
"testing"
96
"time"
7+
8+
"github.com/dgraph-io/ristretto"
9+
"github.com/lxzan/memorycache"
10+
"github.com/lxzan/memorycache/internal/utils"
1011
)
1112

1213
const benchcount = 1000000
@@ -52,6 +53,20 @@ func BenchmarkMemoryCache_Get(b *testing.B) {
5253
})
5354
}
5455

56+
func BenchmarkMemoryCache_GetOrCreate(b *testing.B) {
57+
var mc = memorycache.New(
58+
memorycache.WithBucketNum(128),
59+
memorycache.WithBucketSize(1000, 10000),
60+
)
61+
var i = atomic.Int64{}
62+
b.RunParallel(func(pb *testing.PB) {
63+
for pb.Next() {
64+
index := i.Add(1) % benchcount
65+
mc.GetOrCreate(benchkeys[index], 1, time.Hour)
66+
}
67+
})
68+
}
69+
5570
func BenchmarkRistretto_Set(b *testing.B) {
5671
var mc, _ = ristretto.NewCache(&ristretto.Config{
5772
NumCounters: 1e7, // number of keys to track frequency of (10M).

cache.go

Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
package memorycache
2+
3+
import (
4+
"context"
5+
"github.com/lxzan/memorycache/internal/utils"
6+
"hash/maphash"
7+
"math"
8+
"strings"
9+
"sync"
10+
"sync/atomic"
11+
"time"
12+
)
13+
14+
type MemoryCache struct {
15+
config *config
16+
storage []*bucket
17+
seed maphash.Seed
18+
timestamp atomic.Int64
19+
ctx context.Context
20+
cancel context.CancelFunc
21+
wg sync.WaitGroup
22+
once sync.Once
23+
}
24+
25+
// New 创建缓存数据库实例
26+
// Creating a Cached Database Instance
27+
func New(options ...Option) *MemoryCache {
28+
var c = &config{}
29+
options = append(options, withInitialize())
30+
for _, fn := range options {
31+
fn(c)
32+
}
33+
34+
mc := &MemoryCache{
35+
config: c,
36+
storage: make([]*bucket, c.BucketNum),
37+
seed: maphash.MakeSeed(),
38+
wg: sync.WaitGroup{},
39+
once: sync.Once{},
40+
}
41+
mc.wg.Add(2)
42+
mc.ctx, mc.cancel = context.WithCancel(context.Background())
43+
mc.timestamp.Store(time.Now().UnixMilli())
44+
45+
for i, _ := range mc.storage {
46+
mc.storage[i] = &bucket{
47+
Map: make(map[string]*Element, c.InitialSize),
48+
Heap: newHeap(c.InitialSize),
49+
}
50+
}
51+
52+
go func() {
53+
var d0 = c.MaxInterval
54+
var ticker = time.NewTicker(d0)
55+
defer ticker.Stop()
56+
57+
for {
58+
select {
59+
case <-mc.ctx.Done():
60+
mc.wg.Done()
61+
return
62+
case <-ticker.C:
63+
var sum = 0
64+
var now = time.Now().UnixMilli()
65+
for _, b := range mc.storage {
66+
sum += b.ExpireCheck(now, c.MaxKeysDeleted)
67+
}
68+
69+
// 删除数量超过阈值, 缩小时间间隔
70+
if d1 := utils.SelectValue(sum > c.BucketNum*c.MaxKeysDeleted*7/10, c.MinInterval, c.MaxInterval); d1 != d0 {
71+
d0 = d1
72+
ticker.Reset(d0)
73+
}
74+
}
75+
}
76+
}()
77+
78+
// 每秒更新一次时间戳
79+
go func() {
80+
var ticker = time.NewTicker(time.Second)
81+
defer ticker.Stop()
82+
83+
for {
84+
select {
85+
case <-mc.ctx.Done():
86+
mc.wg.Done()
87+
return
88+
case <-ticker.C:
89+
mc.timestamp.Store(time.Now().UnixMilli())
90+
}
91+
}
92+
}()
93+
94+
return mc
95+
}
96+
97+
func (c *MemoryCache) Stop() {
98+
c.once.Do(func() {
99+
c.cancel()
100+
c.wg.Wait()
101+
})
102+
}
103+
104+
func (c *MemoryCache) getBucket(key string) *bucket {
105+
var idx = maphash.String(c.seed, key) & uint64(c.config.BucketNum-1)
106+
return c.storage[idx]
107+
}
108+
109+
// 获取过期时间, d<=0表示永不过期
110+
func (c *MemoryCache) getExp(d time.Duration) int64 {
111+
if d <= 0 {
112+
return math.MaxInt
113+
}
114+
return c.timestamp.Load() + d.Milliseconds()
115+
}
116+
117+
// 查找数据. 如果存在且超时, 删除并返回false
118+
func (c *MemoryCache) fetch(b *bucket, key string) (*Element, bool) {
119+
v, exist := b.Map[key]
120+
if !exist {
121+
return nil, false
122+
}
123+
124+
if v.expired(c.timestamp.Load()) {
125+
b.Heap.Delete(v.index)
126+
delete(b.Map, key)
127+
v.cb(v, ReasonExpired)
128+
return nil, false
129+
}
130+
131+
return v, true
132+
}
133+
134+
// 检查容量溢出
135+
func (c *MemoryCache) overflow(b *bucket) {
136+
if b.Heap.Len() > c.config.MaxCapacity {
137+
head := b.Heap.Pop()
138+
delete(b.Map, head.Key)
139+
head.cb(head, ReasonOverflow)
140+
}
141+
}
142+
143+
// Clear 清空所有缓存
144+
// clear all caches
145+
func (c *MemoryCache) Clear() {
146+
for _, b := range c.storage {
147+
b.Lock()
148+
b.Heap = newHeap(c.config.InitialSize)
149+
b.Map = make(map[string]*Element, c.config.InitialSize)
150+
b.Unlock()
151+
}
152+
}
153+
154+
// Set 设置键值和过期时间. exp<=0表示永不过期.
155+
// Set the key value and expiration time. exp<=0 means never expire.
156+
func (c *MemoryCache) Set(key string, value any, exp time.Duration) (replaced bool) {
157+
return c.SetWithCallback(key, value, exp, emptyCallback)
158+
}
159+
160+
// SetWithCallback 设置键值, 过期时间和回调函数. 容量溢出和过期都会触发回调.
161+
// Set the key value, expiration time and callback function. The callback is triggered by both capacity overflow and expiration.
162+
func (c *MemoryCache) SetWithCallback(key string, value any, exp time.Duration, cb CallbackFunc) (replaced bool) {
163+
var b = c.getBucket(key)
164+
b.Lock()
165+
defer b.Unlock()
166+
167+
var expireAt = c.getExp(exp)
168+
v, ok := c.fetch(b, key)
169+
if ok {
170+
v.Value = value
171+
v.ExpireAt = expireAt
172+
v.cb = cb
173+
b.Heap.Down(v.index, b.Heap.Len())
174+
return true
175+
}
176+
177+
var ele = &Element{Key: key, Value: value, ExpireAt: expireAt, cb: cb}
178+
b.Heap.Push(ele)
179+
b.Map[key] = ele
180+
c.overflow(b)
181+
return false
182+
}
183+
184+
// Get
185+
func (c *MemoryCache) Get(key string) (v any, exist bool) {
186+
var b = c.getBucket(key)
187+
b.Lock()
188+
defer b.Unlock()
189+
result, ok := c.fetch(c.getBucket(key), key)
190+
if !ok {
191+
return nil, false
192+
}
193+
return result.Value, true
194+
}
195+
196+
// GetWithTTL 获取. 如果存在, 刷新过期时间.
197+
// Get a value. If it exists, refreshes the expiration time.
198+
func (c *MemoryCache) GetWithTTL(key string, exp time.Duration) (v any, exist bool) {
199+
var b = c.getBucket(key)
200+
b.Lock()
201+
defer b.Unlock()
202+
203+
result, ok := c.fetch(c.getBucket(key), key)
204+
if !ok {
205+
return nil, false
206+
}
207+
208+
result.ExpireAt = c.getExp(exp)
209+
b.Heap.Down(result.index, b.Heap.Len())
210+
return result.Value, true
211+
}
212+
213+
// GetOrCreate 如果存在, 刷新过期时间. 如果不存在, 创建一个新的.
214+
// Get or create a value. If it exists, refreshes the expiration time. If it does not exist, creates a new one.
215+
func (c *MemoryCache) GetOrCreate(key string, value any, exp time.Duration) (v any, exist bool) {
216+
return c.GetOrCreateWithCallback(key, value, exp, emptyCallback)
217+
}
218+
219+
// GetOrCreateWithCallback 如果存在, 刷新过期时间. 如果不存在, 创建一个新的.
220+
// Get or create a value with CallbackFunc. If it exists, refreshes the expiration time. If it does not exist, creates a new one.
221+
func (c *MemoryCache) GetOrCreateWithCallback(key string, value any, exp time.Duration, cb CallbackFunc) (v any, exist bool) {
222+
var b = c.getBucket(key)
223+
b.Lock()
224+
defer b.Unlock()
225+
226+
expireAt := c.getExp(exp)
227+
result, ok := c.fetch(c.getBucket(key), key)
228+
if ok {
229+
result.ExpireAt = expireAt
230+
b.Heap.Down(result.index, b.Heap.Len())
231+
return result.Value, true
232+
}
233+
234+
var ele = &Element{Key: key, Value: value, ExpireAt: expireAt, cb: cb}
235+
b.Heap.Push(ele)
236+
b.Map[key] = ele
237+
c.overflow(b)
238+
return value, false
239+
}
240+
241+
// Delete
242+
func (c *MemoryCache) Delete(key string) (deleted bool) {
243+
var b = c.getBucket(key)
244+
b.Lock()
245+
defer b.Unlock()
246+
247+
v, ok := c.fetch(b, key)
248+
if !ok {
249+
return false
250+
}
251+
252+
b.Heap.Delete(v.index)
253+
delete(b.Map, key)
254+
v.cb(v, ReasonDeleted)
255+
return true
256+
}
257+
258+
// Keys 获取前缀匹配的key
259+
// Get prefix matching key
260+
func (c *MemoryCache) Keys(prefix string) []string {
261+
var arr = make([]string, 0)
262+
var now = time.Now().UnixMilli()
263+
for _, b := range c.storage {
264+
b.Lock()
265+
for _, v := range b.Heap.Data {
266+
if !v.expired(now) && strings.HasPrefix(v.Key, prefix) {
267+
arr = append(arr, v.Key)
268+
}
269+
}
270+
b.Unlock()
271+
}
272+
return arr
273+
}
274+
275+
// Len 获取当前元素数量
276+
// Get the number of Elements
277+
func (c *MemoryCache) Len() int {
278+
var num = 0
279+
for _, b := range c.storage {
280+
b.Lock()
281+
num += b.Heap.Len()
282+
b.Unlock()
283+
}
284+
return num
285+
}
286+
287+
type bucket struct {
288+
sync.Mutex
289+
Map map[string]*Element
290+
Heap *heap
291+
}
292+
293+
// 过期时间检查
294+
func (c *bucket) ExpireCheck(now int64, num int) int {
295+
c.Lock()
296+
defer c.Unlock()
297+
298+
var sum = 0
299+
for c.Heap.Len() > 0 && c.Heap.Front().expired(now) && sum < num {
300+
head := c.Heap.Pop()
301+
delete(c.Map, head.Key)
302+
sum++
303+
head.cb(head, ReasonExpired)
304+
}
305+
return sum
306+
}

0 commit comments

Comments
 (0)