Skip to content

Commit 36a8ed6

Browse files
moremindqicz
andauthored
[type:feat] add log module (#85)
* [type:feat] add log module * [type:feat] add log module * [type:feat] add log module * [type:feat] add license header Co-authored-by: Qicz <qiczzhu@gmail.com>
1 parent 1c33da8 commit 36a8ed6

File tree

4 files changed

+503
-0
lines changed

4 files changed

+503
-0
lines changed

log/async_bufwriter.go

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Copyright (c) 2022, AcmeStack
3+
* All rights reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package log
19+
20+
import (
21+
"bytes"
22+
"errors"
23+
"io"
24+
"sync"
25+
"time"
26+
)
27+
28+
const (
29+
FlushSize = 10240
30+
FlushTime = 5 * time.Millisecond
31+
)
32+
33+
type AsyncBufferLogWriter struct {
34+
wait sync.WaitGroup
35+
stopChan chan bool
36+
logChan chan []byte
37+
logBuffer bytes.Buffer
38+
FlushSize int64
39+
w io.Writer
40+
block bool
41+
once sync.Once
42+
}
43+
44+
type Config struct {
45+
// triggers refresh data size threshold
46+
FlushSize int64
47+
48+
// The size of the async cache, which may block or return an error if exceeded
49+
BufferSize int
50+
51+
// trigger refresh timeInterval
52+
FlushInterval time.Duration
53+
54+
// if true, data overflow bufSize will make Write method blocking.
55+
// if false, data overflow will return error.
56+
Block bool
57+
}
58+
59+
var defaultConfig = Config{
60+
FlushSize: FlushSize,
61+
BufferSize: 360,
62+
FlushInterval: FlushTime,
63+
Block: true,
64+
}
65+
66+
// NewAsyncBufferWriter Write data with Buffer, this Writer and Closer is thread safety, but WriteCloser parameters not safety.
67+
// @param w Writer data
68+
// @param c Writer config
69+
// @return *AsyncBufferLogWriter
70+
func NewAsyncBufferWriter(w io.Writer, c ...Config) *AsyncBufferLogWriter {
71+
conf := defaultConfig
72+
if len(c) > 0 {
73+
conf = c[0]
74+
if conf.FlushInterval == 0 {
75+
conf.FlushInterval = FlushTime
76+
}
77+
if conf.BufferSize == 0 {
78+
conf.BufferSize = BufferSize
79+
}
80+
if conf.FlushSize == 0 {
81+
conf.FlushSize = FlushSize
82+
}
83+
}
84+
85+
l := AsyncBufferLogWriter{
86+
stopChan: make(chan bool),
87+
logChan: make(chan []byte, conf.BufferSize),
88+
FlushSize: conf.FlushSize,
89+
w: w,
90+
block: conf.Block,
91+
}
92+
l.wait.Add(1)
93+
l.logBuffer.Grow(conf.BufferSize * 10)
94+
95+
go func() {
96+
defer l.wait.Done()
97+
defer func() {
98+
if w != nil {
99+
size := len(l.logChan)
100+
for i := 0; i < size; i++ {
101+
l.writeLog(<-l.logChan)
102+
}
103+
l.Flush()
104+
}
105+
}()
106+
ticker := time.NewTicker(conf.FlushInterval)
107+
defer ticker.Stop()
108+
for {
109+
select {
110+
case <-l.stopChan:
111+
return
112+
case <-ticker.C:
113+
l.Flush()
114+
default:
115+
}
116+
select {
117+
case <-l.stopChan:
118+
return
119+
case d, ok := <-l.logChan:
120+
if ok {
121+
l.writeLog(d)
122+
}
123+
}
124+
}
125+
}()
126+
return &l
127+
}
128+
129+
// Flush data.
130+
// @receiver w AsyncBufferLogWriter pointer point address
131+
// @return error
132+
func (w *AsyncBufferLogWriter) Flush() error {
133+
_, err := w.w.Write(w.logBuffer.Bytes())
134+
if err != nil {
135+
return err
136+
}
137+
w.logBuffer.Reset()
138+
return nil
139+
}
140+
141+
// writeLog private method, write log.
142+
// @receiver w AsyncBufferLogWriter pointer point address
143+
// @param data log data
144+
// @return error
145+
func (w *AsyncBufferLogWriter) writeLog(data []byte) error {
146+
w.logBuffer.Write(data)
147+
148+
if int64(w.logBuffer.Len()) < w.FlushSize {
149+
return nil
150+
}
151+
152+
return w.Flush()
153+
}
154+
155+
// Close close writer buffer
156+
// @receiver w
157+
// @return error
158+
func (w *AsyncBufferLogWriter) Close() error {
159+
w.once.Do(func() {
160+
close(w.stopChan)
161+
w.wait.Wait()
162+
})
163+
return nil
164+
}
165+
166+
func (w *AsyncBufferLogWriter) Write(data []byte) (n int, err error) {
167+
if len(data) == 0 {
168+
return 0, nil
169+
}
170+
if w.block {
171+
select {
172+
case w.logChan <- data:
173+
return len(data), nil
174+
case <-w.stopChan:
175+
return 0, errors.New("writer is closed")
176+
}
177+
} else {
178+
select {
179+
case w.logChan <- data:
180+
return len(data), nil
181+
case <-w.stopChan:
182+
return 0, errors.New("writer is closed")
183+
default:
184+
return 0, errors.New("write log failed ")
185+
}
186+
}
187+
}

log/async_writer.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright (c) 2022, AcmeStack
3+
* All rights reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package log
19+
20+
import (
21+
"errors"
22+
"io"
23+
"sync"
24+
)
25+
26+
const (
27+
BufferSize = 10240
28+
)
29+
30+
type AsyncLogWriter struct {
31+
stopChan chan bool
32+
logChan chan []byte
33+
w io.Writer
34+
block bool
35+
wait sync.WaitGroup
36+
once sync.Once
37+
}
38+
39+
// NewAsyncWriter Write data with Buffer, this Writer and Closer is thread safety, but WriteCloser parameters not safety.
40+
// @param w Writer
41+
// @param bufSize accept buffer max length
42+
// @param block if true, overflow buffer size, will blocking, if false will occur error
43+
// @return *AsyncLogWriter
44+
func NewAsyncWriter(w io.Writer, bufSize int, block bool) *AsyncLogWriter {
45+
if bufSize <= 0 {
46+
bufSize = BufferSize
47+
}
48+
l := AsyncLogWriter{
49+
stopChan: make(chan bool),
50+
logChan: make(chan []byte, bufSize),
51+
w: w,
52+
block: block,
53+
}
54+
l.wait.Add(1)
55+
56+
go func() {
57+
defer l.wait.Done()
58+
for {
59+
select {
60+
case <-l.stopChan:
61+
return
62+
case d, ok := <-l.logChan:
63+
if ok {
64+
l.writeLog(d)
65+
}
66+
}
67+
}
68+
}()
69+
return &l
70+
}
71+
72+
func (w *AsyncLogWriter) writeLog(data []byte) {
73+
if w.w != nil {
74+
w.w.Write(data)
75+
}
76+
}
77+
78+
func (w *AsyncLogWriter) Close() error {
79+
w.once.Do(func() {
80+
close(w.stopChan)
81+
w.wait.Wait()
82+
})
83+
return nil
84+
}
85+
86+
func (w *AsyncLogWriter) Write(data []byte) (n int, err error) {
87+
if len(data) == 0 {
88+
return 0, nil
89+
}
90+
91+
if w.block {
92+
w.logChan <- data
93+
return len(data), nil
94+
} else {
95+
select {
96+
case w.logChan <- data:
97+
return len(data), nil
98+
default:
99+
return 0, errors.New("write log failed ")
100+
}
101+
}
102+
}

0 commit comments

Comments
 (0)