diff --git a/README.md b/README.md index 38b69e9..b455c13 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,13 @@ Or install it yourself as: $ gem install fluent-plugin-concat +## Plugin helpers + +* [timer](https://docs.fluentd.org/v1.0/articles/api-plugin-helper-timer) +* [event_emitter](https://docs.fluentd.org/v1.0/articles/api-plugin-helper-event_emitter) + +* See also: [Filter Plugin Overview](https://docs.fluentd.org/filter#overview) + ## Configuration ### Example @@ -79,6 +86,20 @@ Or install it yourself as: |use\_partial\_cri\_logtag|bool (optional)|Use cri log tag to concatenate multiple records|| |partial\_cri\_logtag\_key|string (optional)|The key name that is referred to concatenate records on cri log|| |partial\_cri\_stream\_key|string (optional)|The key name that is referred to detect stream name on cri log|`stream`| +|buffer\_limit\_size|The max size of each buffer|`512000`| + +|parameter|description|available values|default| +|---|---|---|---| +|buffer\_overflow\_method|The method if overflow buffer|`ignore`, `truncate`, `drop`, `new`|`ignore`| + +* `ignore` + * Concatenate the current record in the buffer +* `truncate` + * Drop the current record and flush the buffer +* `drop` + * Drop the current record and clear the buffer +* `new` + * Flush the buffer and store the current record in next buffer ## Usage diff --git a/lib/fluent/plugin/filter_concat.rb b/lib/fluent/plugin/filter_concat.rb index 6905599..04e7ffb 100644 --- a/lib/fluent/plugin/filter_concat.rb +++ b/lib/fluent/plugin/filter_concat.rb @@ -22,7 +22,7 @@ class ConcatFilter < Filter config_param :stream_identity_key, :string, default: nil desc "The interval between data flushes, 0 means disable timeout" config_param :flush_interval, :time, default: 60 - desc "The label name to handle timeout" + desc "The label name to handle events caused by timeout" config_param :timeout_label, :string, default: nil desc "Use timestamp of first record when buffer is flushed" config_param :use_first_timestamp, :bool, default: false @@ -44,6 +44,10 @@ class ConcatFilter < Filter config_param :partial_cri_logtag_key, :string, default: nil desc "The key name that is referred to detect stream name on cri log" config_param :partial_cri_stream_key, :string, default: "stream" + desc "The max size of each buffer" + config_param :buffer_limit_size, :size, default: 500 * 1024 # 500k + desc "The method if overflow buffer" + config_param :buffer_overflow_method, :enum, list: [:ignore, :truncate, :drop, :new], default: :ignore class TimeoutError < StandardError end @@ -52,6 +56,7 @@ def initialize super @buffer = Hash.new {|h, k| h[k] = [] } + @buffer_size = Hash.new(0) @timeout_map_mutex = Thread::Mutex.new @timeout_map_mutex.synchronize do @timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now } @@ -270,13 +275,39 @@ def process_line(stream_identity, tag, time, record) def process_partial(stream_identity, tag, time, record) new_es = Fluent::MultiEventStream.new - @buffer[stream_identity] << [tag, time, record] - unless @partial_value == record[@partial_key] + force_flush = false + if overflow?(stream_identity, record) + force_flush = case @buffer_overflow_method + when :ignore + @buffer[stream_identity] << [tag, time, record] + false + when :truncate + true + when :drop + @buffer[stream_identity] = [] + false + when :new + true + end + else + @buffer[stream_identity] << [tag, time, record] + end + if force_flush || @partial_value != record[@partial_key] new_time, new_record = flush_buffer(stream_identity) time = new_time if @use_first_timestamp new_record.delete(@partial_key) new_es.add(time, new_record) end + if force_flush && @buffer_overflow_method == :new + @buffer[stream_identity] << [tag, time, record] + @buffer_size[stream_identity] = record.keys.sum(&:bytesize) + record.values.sum(&:bytesize) + if @partial_value != record[@partial_key] + new_time, new_record = flush_buffer(stream_identity) + time = new_time if @use_first_timestamp + new_record.delete(@partial_key) + new_es.add(time, new_record) + end + end new_es end @@ -371,6 +402,17 @@ def continuous_line?(text) end end + def overflow?(stream_identity, record) + size = record.keys.sum(&:bytesize) + record.values.sum(&:bytesize) + if @buffer_size[stream_identity] + size > @buffer_limit_size + @buffer_size[stream_identity] = 0 + true + else + @buffer_size[stream_identity] += size + false + end + end + def flush_buffer(stream_identity, new_element = nil) lines = if @mode == :partial_metadata @buffer[stream_identity] @@ -385,6 +427,7 @@ def flush_buffer(stream_identity, new_element = nil) } @buffer[stream_identity] = [] @buffer[stream_identity] << new_element if new_element + @buffer_size[stream_identity] = 0 [time, first_record.merge(new_record)] end @@ -434,3 +477,18 @@ def handle_timeout_error(tag, time, record, message) end end end + +class Array + # Support Ruby 2.3 or earlier + unless [].respond_to?(:sum) + def sum + inject(0) do |memo, value| + if block_given? + memo + yield(value) + else + memo + value + end + end + end + end +end diff --git a/test/plugin/test_filter_concat.rb b/test/plugin/test_filter_concat.rb index 0c765e6..ba11449 100644 --- a/test/plugin/test_filter_concat.rb +++ b/test/plugin/test_filter_concat.rb @@ -1069,4 +1069,122 @@ def filter_with_time(conf, messages, wait: nil) assert_equal(expected, filtered) end end + + sub_test_case "overflow" do + test "ignore" do + config = <<-CONFIG + key message + partial_key partial_message + partial_value true + keep_partial_key false + buffer_limit_size 10 + buffer_overflow_method ignore + CONFIG + messages = [ + { "container_id" => "1", "message" => "start", "partial_message" => "true" }, + { "container_id" => "1", "message" => " message 1", "partial_message" => "true" }, + { "container_id" => "1", "message" => " message 2", "partial_message" => "true" }, + { "container_id" => "1", "message" => "end", "partial_message" => "false" }, + { "container_id" => "1", "message" => "start", "partial_message" => "true" }, + { "container_id" => "1", "message" => " message 3", "partial_message" => "true" }, + { "container_id" => "1", "message" => " message 4", "partial_message" => "true" }, + { "container_id" => "1", "message" => "end", "partial_message" => "false" }, + ] + filtered = filter(config, messages, wait: 3) + expected = [ + { "container_id" => "1", "message" => "start\n message 1\n message 2\nend" }, + { "container_id" => "1", "message" => "start\n message 3\n message 4\nend" }, + ] + assert_equal(expected, filtered) + end + + test "truncate" do + config = <<-CONFIG + key message + partial_key partial_message + partial_value true + keep_partial_key false + buffer_limit_size 60 + buffer_overflow_method truncate + CONFIG + messages = [ + { "container_id" => "1", "message" => "start", "partial_message" => "true" }, + { "container_id" => "1", "message" => " message 1", "partial_message" => "true" }, + { "container_id" => "1", "message" => " message 2", "partial_message" => "true" }, + { "container_id" => "1", "message" => "end", "partial_message" => "false" }, + { "container_id" => "1", "message" => "start", "partial_message" => "true" }, + { "container_id" => "1", "message" => " message 3", "partial_message" => "true" }, + { "container_id" => "1", "message" => " message 4", "partial_message" => "true" }, + { "container_id" => "1", "message" => "end", "partial_message" => "false" }, + ] + filtered = filter(config, messages, wait: 3) + expected = [ + { "container_id" => "1", "message" => "start" }, + { "container_id" => "1", "message" => " message 2" }, + { "container_id" => "1", "message" => "start" }, + { "container_id" => "1", "message" => " message 4" }, + ] + assert_equal(expected, filtered) + end + + test "drop" do + config = <<-CONFIG + key message + partial_key partial_message + partial_value true + keep_partial_key false + buffer_limit_size 100 + buffer_overflow_method drop + CONFIG + messages = [ + { "container_id" => "1", "message" => "start", "partial_message" => "true" }, + { "container_id" => "1", "message" => " message 1", "partial_message" => "true" }, + { "container_id" => "1", "message" => " message 2", "partial_message" => "true" }, + { "container_id" => "1", "message" => "end", "partial_message" => "false" }, + { "container_id" => "1", "message" => "start", "partial_message" => "true" }, + { "container_id" => "1", "message" => " message 3", "partial_message" => "true" }, + { "container_id" => "1", "message" => " message 4", "partial_message" => "true" }, + { "container_id" => "1", "message" => "end", "partial_message" => "false" }, + ] + filtered = filter(config, messages, wait: 3) + expected = [ + { "container_id" => "1", "message" => "end" }, + { "container_id" => "1", "message" => "end" }, + ] + assert_equal(expected, filtered) + end + + test "new" do + config = <<-CONFIG + key message + partial_key partial_message + partial_value true + keep_partial_key false + buffer_limit_size 90 + buffer_overflow_method new + CONFIG + messages = [ + { "container_id" => "1", "message" => "start", "partial_message" => "true" }, + { "container_id" => "1", "message" => " message 1", "partial_message" => "true" }, + { "container_id" => "1", "message" => " message 2", "partial_message" => "true" }, + { "container_id" => "1", "message" => "end", "partial_message" => "false" }, + { "container_id" => "1", "message" => "start", "partial_message" => "true" }, + { "container_id" => "1", "message" => " message 3", "partial_message" => "true" }, + { "container_id" => "1", "message" => " message 4", "partial_message" => "true" }, + { "container_id" => "1", "message" => "end", "partial_message" => "false" }, + ] + filtered = filter(config, messages, wait: 3) + expected = [ + { "container_id" => "1", "message" => "start" }, + { "container_id" => "1", "message" => " message 1" }, + { "container_id" => "1", "message" => " message 2" }, + { "container_id" => "1", "message" => "end" }, + { "container_id" => "1", "message" => "start" }, + { "container_id" => "1", "message" => " message 3" }, + { "container_id" => "1", "message" => " message 4" }, + { "container_id" => "1", "message" => "end" }, + ] + assert_equal(expected, filtered) + end + end end