Skip to content

Add buffer_limit_size and buffer_overflow_method #68

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ Or install it yourself as:

$ gem install fluent-plugin-concat

## Plugin helpers

* [timer](https://docs.fluentd.org/v1.0/articles/api-plugin-helper-timer)
* [event_emitter](https://docs.fluentd.org/v1.0/articles/api-plugin-helper-event_emitter)

* See also: [Filter Plugin Overview](https://docs.fluentd.org/filter#overview)

## Configuration

### Example
Expand Down Expand Up @@ -79,6 +86,20 @@ Or install it yourself as:
|use\_partial\_cri\_logtag|bool (optional)|Use cri log tag to concatenate multiple records||
|partial\_cri\_logtag\_key|string (optional)|The key name that is referred to concatenate records on cri log||
|partial\_cri\_stream\_key|string (optional)|The key name that is referred to detect stream name on cri log|`stream`|
|buffer\_limit\_size|The max size of each buffer|`512000`|

|parameter|description|available values|default|
|---|---|---|---|
|buffer\_overflow\_method|The method if overflow buffer|`ignore`, `truncate`, `drop`, `new`|`ignore`|

* `ignore`
* Concatenate the current record in the buffer
* `truncate`
* Drop the current record and flush the buffer
* `drop`
* Drop the current record and clear the buffer
* `new`
* Flush the buffer and store the current record in next buffer

## Usage

Expand Down
64 changes: 61 additions & 3 deletions lib/fluent/plugin/filter_concat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class ConcatFilter < Filter
config_param :stream_identity_key, :string, default: nil
desc "The interval between data flushes, 0 means disable timeout"
config_param :flush_interval, :time, default: 60
desc "The label name to handle timeout"
desc "The label name to handle events caused by timeout"
config_param :timeout_label, :string, default: nil
desc "Use timestamp of first record when buffer is flushed"
config_param :use_first_timestamp, :bool, default: false
Expand All @@ -44,6 +44,10 @@ class ConcatFilter < Filter
config_param :partial_cri_logtag_key, :string, default: nil
desc "The key name that is referred to detect stream name on cri log"
config_param :partial_cri_stream_key, :string, default: "stream"
desc "The max size of each buffer"
config_param :buffer_limit_size, :size, default: 500 * 1024 # 500k
desc "The method if overflow buffer"
config_param :buffer_overflow_method, :enum, list: [:ignore, :truncate, :drop, :new], default: :ignore

class TimeoutError < StandardError
end
Expand All @@ -52,6 +56,7 @@ def initialize
super

@buffer = Hash.new {|h, k| h[k] = [] }
@buffer_size = Hash.new(0)
@timeout_map_mutex = Thread::Mutex.new
@timeout_map_mutex.synchronize do
@timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now }
Expand Down Expand Up @@ -270,13 +275,39 @@ def process_line(stream_identity, tag, time, record)

def process_partial(stream_identity, tag, time, record)
new_es = Fluent::MultiEventStream.new
@buffer[stream_identity] << [tag, time, record]
unless @partial_value == record[@partial_key]
force_flush = false
if overflow?(stream_identity, record)
force_flush = case @buffer_overflow_method
when :ignore
@buffer[stream_identity] << [tag, time, record]
false
when :truncate
true
when :drop
@buffer[stream_identity] = []
false
when :new
true
end
else
@buffer[stream_identity] << [tag, time, record]
end
if force_flush || @partial_value != record[@partial_key]
new_time, new_record = flush_buffer(stream_identity)
time = new_time if @use_first_timestamp
new_record.delete(@partial_key)
new_es.add(time, new_record)
end
if force_flush && @buffer_overflow_method == :new
@buffer[stream_identity] << [tag, time, record]
@buffer_size[stream_identity] = record.keys.sum(&:bytesize) + record.values.sum(&:bytesize)
if @partial_value != record[@partial_key]
new_time, new_record = flush_buffer(stream_identity)
time = new_time if @use_first_timestamp
new_record.delete(@partial_key)
new_es.add(time, new_record)
end
end
new_es
end

Expand Down Expand Up @@ -371,6 +402,17 @@ def continuous_line?(text)
end
end

def overflow?(stream_identity, record)
size = record.keys.sum(&:bytesize) + record.values.sum(&:bytesize)
if @buffer_size[stream_identity] + size > @buffer_limit_size
@buffer_size[stream_identity] = 0
true
else
@buffer_size[stream_identity] += size
false
end
end

def flush_buffer(stream_identity, new_element = nil)
lines = if @mode == :partial_metadata
@buffer[stream_identity]
Expand All @@ -385,6 +427,7 @@ def flush_buffer(stream_identity, new_element = nil)
}
@buffer[stream_identity] = []
@buffer[stream_identity] << new_element if new_element
@buffer_size[stream_identity] = 0
[time, first_record.merge(new_record)]
end

Expand Down Expand Up @@ -434,3 +477,18 @@ def handle_timeout_error(tag, time, record, message)
end
end
end

class Array
# Support Ruby 2.3 or earlier
unless [].respond_to?(:sum)
def sum
inject(0) do |memo, value|
if block_given?
memo + yield(value)
else
memo + value
end
end
end
end
end
118 changes: 118 additions & 0 deletions test/plugin/test_filter_concat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1069,4 +1069,122 @@ def filter_with_time(conf, messages, wait: nil)
assert_equal(expected, filtered)
end
end

sub_test_case "overflow" do
test "ignore" do
config = <<-CONFIG
key message
partial_key partial_message
partial_value true
keep_partial_key false
buffer_limit_size 10
buffer_overflow_method ignore
CONFIG
messages = [
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
{ "container_id" => "1", "message" => " message 1", "partial_message" => "true" },
{ "container_id" => "1", "message" => " message 2", "partial_message" => "true" },
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
{ "container_id" => "1", "message" => " message 3", "partial_message" => "true" },
{ "container_id" => "1", "message" => " message 4", "partial_message" => "true" },
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
]
filtered = filter(config, messages, wait: 3)
expected = [
{ "container_id" => "1", "message" => "start\n message 1\n message 2\nend" },
{ "container_id" => "1", "message" => "start\n message 3\n message 4\nend" },
]
assert_equal(expected, filtered)
end

test "truncate" do
config = <<-CONFIG
key message
partial_key partial_message
partial_value true
keep_partial_key false
buffer_limit_size 60
buffer_overflow_method truncate
CONFIG
messages = [
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
{ "container_id" => "1", "message" => " message 1", "partial_message" => "true" },
{ "container_id" => "1", "message" => " message 2", "partial_message" => "true" },
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
{ "container_id" => "1", "message" => " message 3", "partial_message" => "true" },
{ "container_id" => "1", "message" => " message 4", "partial_message" => "true" },
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
]
filtered = filter(config, messages, wait: 3)
expected = [
{ "container_id" => "1", "message" => "start" },
{ "container_id" => "1", "message" => " message 2" },
{ "container_id" => "1", "message" => "start" },
{ "container_id" => "1", "message" => " message 4" },
]
assert_equal(expected, filtered)
end

test "drop" do
config = <<-CONFIG
key message
partial_key partial_message
partial_value true
keep_partial_key false
buffer_limit_size 100
buffer_overflow_method drop
CONFIG
messages = [
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
{ "container_id" => "1", "message" => " message 1", "partial_message" => "true" },
{ "container_id" => "1", "message" => " message 2", "partial_message" => "true" },
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
{ "container_id" => "1", "message" => " message 3", "partial_message" => "true" },
{ "container_id" => "1", "message" => " message 4", "partial_message" => "true" },
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
]
filtered = filter(config, messages, wait: 3)
expected = [
{ "container_id" => "1", "message" => "end" },
{ "container_id" => "1", "message" => "end" },
]
assert_equal(expected, filtered)
end

test "new" do
config = <<-CONFIG
key message
partial_key partial_message
partial_value true
keep_partial_key false
buffer_limit_size 90
buffer_overflow_method new
CONFIG
messages = [
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
{ "container_id" => "1", "message" => " message 1", "partial_message" => "true" },
{ "container_id" => "1", "message" => " message 2", "partial_message" => "true" },
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
{ "container_id" => "1", "message" => " message 3", "partial_message" => "true" },
{ "container_id" => "1", "message" => " message 4", "partial_message" => "true" },
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
]
filtered = filter(config, messages, wait: 3)
expected = [
{ "container_id" => "1", "message" => "start" },
{ "container_id" => "1", "message" => " message 1" },
{ "container_id" => "1", "message" => " message 2" },
{ "container_id" => "1", "message" => "end" },
{ "container_id" => "1", "message" => "start" },
{ "container_id" => "1", "message" => " message 3" },
{ "container_id" => "1", "message" => " message 4" },
{ "container_id" => "1", "message" => "end" },
]
assert_equal(expected, filtered)
end
end
end