Skip to content

Commit 5a6932f

Browse files
okkezWatson1978
andauthored
Add buffer_size_limit and buffer_overflow_method (#68)
* Add buffer_size_limit and buffer_overflow_method Signed-off-by: Kenji Okimoto <okimoto@clear-code.com> * Fix link to Filter Plugin Overview Signed-off-by: Shizuo Fujita <fujita@clear-code.com> --------- Signed-off-by: Kenji Okimoto <okimoto@clear-code.com> Signed-off-by: Shizuo Fujita <fujita@clear-code.com> Co-authored-by: Shizuo Fujita <fujita@clear-code.com>
1 parent 304cc59 commit 5a6932f

File tree

3 files changed

+200
-3
lines changed

3 files changed

+200
-3
lines changed

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ Or install it yourself as:
2727

2828
$ gem install fluent-plugin-concat
2929

30+
## Plugin helpers
31+
32+
* [timer](https://docs.fluentd.org/v1.0/articles/api-plugin-helper-timer)
33+
* [event_emitter](https://docs.fluentd.org/v1.0/articles/api-plugin-helper-event_emitter)
34+
35+
* See also: [Filter Plugin Overview](https://docs.fluentd.org/filter#overview)
36+
3037
## Configuration
3138

3239
### Example
@@ -79,6 +86,20 @@ Or install it yourself as:
7986
|use\_partial\_cri\_logtag|bool (optional)|Use cri log tag to concatenate multiple records||
8087
|partial\_cri\_logtag\_key|string (optional)|The key name that is referred to concatenate records on cri log||
8188
|partial\_cri\_stream\_key|string (optional)|The key name that is referred to detect stream name on cri log|`stream`|
89+
|buffer\_limit\_size|The max size of each buffer|`512000`|
90+
91+
|parameter|description|available values|default|
92+
|---|---|---|---|
93+
|buffer\_overflow\_method|The method if overflow buffer|`ignore`, `truncate`, `drop`, `new`|`ignore`|
94+
95+
* `ignore`
96+
* Concatenate the current record in the buffer
97+
* `truncate`
98+
* Drop the current record and flush the buffer
99+
* `drop`
100+
* Drop the current record and clear the buffer
101+
* `new`
102+
* Flush the buffer and store the current record in next buffer
82103

83104
## Usage
84105

lib/fluent/plugin/filter_concat.rb

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class ConcatFilter < Filter
2222
config_param :stream_identity_key, :string, default: nil
2323
desc "The interval between data flushes, 0 means disable timeout"
2424
config_param :flush_interval, :time, default: 60
25-
desc "The label name to handle timeout"
25+
desc "The label name to handle events caused by timeout"
2626
config_param :timeout_label, :string, default: nil
2727
desc "Use timestamp of first record when buffer is flushed"
2828
config_param :use_first_timestamp, :bool, default: false
@@ -44,6 +44,10 @@ class ConcatFilter < Filter
4444
config_param :partial_cri_logtag_key, :string, default: nil
4545
desc "The key name that is referred to detect stream name on cri log"
4646
config_param :partial_cri_stream_key, :string, default: "stream"
47+
desc "The max size of each buffer"
48+
config_param :buffer_limit_size, :size, default: 500 * 1024 # 500k
49+
desc "The method if overflow buffer"
50+
config_param :buffer_overflow_method, :enum, list: [:ignore, :truncate, :drop, :new], default: :ignore
4751

4852
class TimeoutError < StandardError
4953
end
@@ -52,6 +56,7 @@ def initialize
5256
super
5357

5458
@buffer = Hash.new {|h, k| h[k] = [] }
59+
@buffer_size = Hash.new(0)
5560
@timeout_map_mutex = Thread::Mutex.new
5661
@timeout_map_mutex.synchronize do
5762
@timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now }
@@ -270,13 +275,39 @@ def process_line(stream_identity, tag, time, record)
270275

271276
def process_partial(stream_identity, tag, time, record)
272277
new_es = Fluent::MultiEventStream.new
273-
@buffer[stream_identity] << [tag, time, record]
274-
unless @partial_value == record[@partial_key]
278+
force_flush = false
279+
if overflow?(stream_identity, record)
280+
force_flush = case @buffer_overflow_method
281+
when :ignore
282+
@buffer[stream_identity] << [tag, time, record]
283+
false
284+
when :truncate
285+
true
286+
when :drop
287+
@buffer[stream_identity] = []
288+
false
289+
when :new
290+
true
291+
end
292+
else
293+
@buffer[stream_identity] << [tag, time, record]
294+
end
295+
if force_flush || @partial_value != record[@partial_key]
275296
new_time, new_record = flush_buffer(stream_identity)
276297
time = new_time if @use_first_timestamp
277298
new_record.delete(@partial_key)
278299
new_es.add(time, new_record)
279300
end
301+
if force_flush && @buffer_overflow_method == :new
302+
@buffer[stream_identity] << [tag, time, record]
303+
@buffer_size[stream_identity] = record.keys.sum(&:bytesize) + record.values.sum(&:bytesize)
304+
if @partial_value != record[@partial_key]
305+
new_time, new_record = flush_buffer(stream_identity)
306+
time = new_time if @use_first_timestamp
307+
new_record.delete(@partial_key)
308+
new_es.add(time, new_record)
309+
end
310+
end
280311
new_es
281312
end
282313

@@ -371,6 +402,17 @@ def continuous_line?(text)
371402
end
372403
end
373404

405+
def overflow?(stream_identity, record)
406+
size = record.keys.sum(&:bytesize) + record.values.sum(&:bytesize)
407+
if @buffer_size[stream_identity] + size > @buffer_limit_size
408+
@buffer_size[stream_identity] = 0
409+
true
410+
else
411+
@buffer_size[stream_identity] += size
412+
false
413+
end
414+
end
415+
374416
def flush_buffer(stream_identity, new_element = nil)
375417
lines = if @mode == :partial_metadata
376418
@buffer[stream_identity]
@@ -385,6 +427,7 @@ def flush_buffer(stream_identity, new_element = nil)
385427
}
386428
@buffer[stream_identity] = []
387429
@buffer[stream_identity] << new_element if new_element
430+
@buffer_size[stream_identity] = 0
388431
[time, first_record.merge(new_record)]
389432
end
390433

@@ -434,3 +477,18 @@ def handle_timeout_error(tag, time, record, message)
434477
end
435478
end
436479
end
480+
481+
class Array
482+
# Support Ruby 2.3 or earlier
483+
unless [].respond_to?(:sum)
484+
def sum
485+
inject(0) do |memo, value|
486+
if block_given?
487+
memo + yield(value)
488+
else
489+
memo + value
490+
end
491+
end
492+
end
493+
end
494+
end

test/plugin/test_filter_concat.rb

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1069,4 +1069,122 @@ def filter_with_time(conf, messages, wait: nil)
10691069
assert_equal(expected, filtered)
10701070
end
10711071
end
1072+
1073+
sub_test_case "overflow" do
1074+
test "ignore" do
1075+
config = <<-CONFIG
1076+
key message
1077+
partial_key partial_message
1078+
partial_value true
1079+
keep_partial_key false
1080+
buffer_limit_size 10
1081+
buffer_overflow_method ignore
1082+
CONFIG
1083+
messages = [
1084+
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
1085+
{ "container_id" => "1", "message" => " message 1", "partial_message" => "true" },
1086+
{ "container_id" => "1", "message" => " message 2", "partial_message" => "true" },
1087+
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
1088+
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
1089+
{ "container_id" => "1", "message" => " message 3", "partial_message" => "true" },
1090+
{ "container_id" => "1", "message" => " message 4", "partial_message" => "true" },
1091+
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
1092+
]
1093+
filtered = filter(config, messages, wait: 3)
1094+
expected = [
1095+
{ "container_id" => "1", "message" => "start\n message 1\n message 2\nend" },
1096+
{ "container_id" => "1", "message" => "start\n message 3\n message 4\nend" },
1097+
]
1098+
assert_equal(expected, filtered)
1099+
end
1100+
1101+
test "truncate" do
1102+
config = <<-CONFIG
1103+
key message
1104+
partial_key partial_message
1105+
partial_value true
1106+
keep_partial_key false
1107+
buffer_limit_size 60
1108+
buffer_overflow_method truncate
1109+
CONFIG
1110+
messages = [
1111+
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
1112+
{ "container_id" => "1", "message" => " message 1", "partial_message" => "true" },
1113+
{ "container_id" => "1", "message" => " message 2", "partial_message" => "true" },
1114+
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
1115+
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
1116+
{ "container_id" => "1", "message" => " message 3", "partial_message" => "true" },
1117+
{ "container_id" => "1", "message" => " message 4", "partial_message" => "true" },
1118+
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
1119+
]
1120+
filtered = filter(config, messages, wait: 3)
1121+
expected = [
1122+
{ "container_id" => "1", "message" => "start" },
1123+
{ "container_id" => "1", "message" => " message 2" },
1124+
{ "container_id" => "1", "message" => "start" },
1125+
{ "container_id" => "1", "message" => " message 4" },
1126+
]
1127+
assert_equal(expected, filtered)
1128+
end
1129+
1130+
test "drop" do
1131+
config = <<-CONFIG
1132+
key message
1133+
partial_key partial_message
1134+
partial_value true
1135+
keep_partial_key false
1136+
buffer_limit_size 100
1137+
buffer_overflow_method drop
1138+
CONFIG
1139+
messages = [
1140+
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
1141+
{ "container_id" => "1", "message" => " message 1", "partial_message" => "true" },
1142+
{ "container_id" => "1", "message" => " message 2", "partial_message" => "true" },
1143+
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
1144+
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
1145+
{ "container_id" => "1", "message" => " message 3", "partial_message" => "true" },
1146+
{ "container_id" => "1", "message" => " message 4", "partial_message" => "true" },
1147+
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
1148+
]
1149+
filtered = filter(config, messages, wait: 3)
1150+
expected = [
1151+
{ "container_id" => "1", "message" => "end" },
1152+
{ "container_id" => "1", "message" => "end" },
1153+
]
1154+
assert_equal(expected, filtered)
1155+
end
1156+
1157+
test "new" do
1158+
config = <<-CONFIG
1159+
key message
1160+
partial_key partial_message
1161+
partial_value true
1162+
keep_partial_key false
1163+
buffer_limit_size 90
1164+
buffer_overflow_method new
1165+
CONFIG
1166+
messages = [
1167+
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
1168+
{ "container_id" => "1", "message" => " message 1", "partial_message" => "true" },
1169+
{ "container_id" => "1", "message" => " message 2", "partial_message" => "true" },
1170+
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
1171+
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
1172+
{ "container_id" => "1", "message" => " message 3", "partial_message" => "true" },
1173+
{ "container_id" => "1", "message" => " message 4", "partial_message" => "true" },
1174+
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
1175+
]
1176+
filtered = filter(config, messages, wait: 3)
1177+
expected = [
1178+
{ "container_id" => "1", "message" => "start" },
1179+
{ "container_id" => "1", "message" => " message 1" },
1180+
{ "container_id" => "1", "message" => " message 2" },
1181+
{ "container_id" => "1", "message" => "end" },
1182+
{ "container_id" => "1", "message" => "start" },
1183+
{ "container_id" => "1", "message" => " message 3" },
1184+
{ "container_id" => "1", "message" => " message 4" },
1185+
{ "container_id" => "1", "message" => "end" },
1186+
]
1187+
assert_equal(expected, filtered)
1188+
end
1189+
end
10721190
end

0 commit comments

Comments
 (0)