You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Ruby Version: ruby 2.7.8p225 (2023-03-30 revision 1f4d455848) [x86_64-linux] Fluentd Version (Fluentd is built from source): fluentd 1.16.1
We have custom input plugin which open ups tcp port and listen on it. There it will receive log message from rsyslog. Custom input plugin will parse(Using Custom Mechanism not using parsers of fluentd) the log and will convert it into key value pair JSON object according to business logic. Then it will emit the event with tag,timestamp and json object.
Custom Output plugin will get those tag, timestamp and json object. According to tag it will decide the elasticsearch index and also will create payload including json object and required metadata of index and such and will make API Call to elasticsearch to push data into elasticsearch.
Code snippet which emits event.
#start method will open up tcp socket using Cool.io library of ruby.
#business logic for conversion
@log.debug "Kcsv: JSON - #{record}"
if record.length > 0
@log.info("IUnidkhkhfkwehfkhewifhewifhweihfncnndndndndndndndn HELEOELEOLEOLEO")
router.emit(tag, time, record)
end
Code snippet which takes emited object and will make api call.
def write(chunk)
@log.info(chunk)
bulk_message = []
eventhub_messages = []
chunk.msgpack_each do |tag, time, record|
@log.info "kelasticsearch wirte ==> tag: #{tag} time : #{time} record #{record}"
target_index=""
type_name=record["type"]
splittag = tag.split("_")
if time.to_i > 1396357123
next
if record.include? "audittype"
record.merge!({"@ts" => time})
else
record.merge!({"@timestamp" => time})
end
else
if record.include? "audittype"
record.merge!({"@ts" => time})
else
record.merge!({"@timestamp" => time})
end
if $auditlogs.include? record["type"]
target_index = "_#{splittag[0]}_audit_#{record["type"]}"
else
target_index = "_#{splittag[0]}_audit_auditdata"
end
end
meta = { "index" => {"_index" => target_index, "_type" => "_doc" } }
@log.info "meta #{meta}"
bulk_message << Yajl::Encoder.encode(meta)
bulk_message << Yajl::Encoder.encode(record)
@log.info "index #{target_index}"
unless bulk_message.size < @bulkidxsize
handle_bulk_msg(bulk_message)
bulk_message.clear()
end
end
handle_bulk_msg(bulk_message)
end
Code snippet which will make a API Call
$klog = @log
def handle_bulk_msg(body)
begin
$klog.info("Inside http_bulk_msg")
http = Net::HTTP.new(@host, @port_i)
request = Net::HTTP::Post.new('/_bulk', {'content-type' => 'application/json; charset=utf-8'})
request.body = body
request.basic_auth("admin", "Elas#1234")
http.request(request).value
rescue Timeout::Error, Errno::EINVAL, Errno::ECONNRESET, EOFError,Net::HTTPBadResponse, Net::HTTPHeaderSyntaxError, Net::ProtocolError => e
$klog.error e.backtrace.join("\n")
end
end
Have a look at logs below. Input plugin is writing log in kcsv.log, fluentd is writting in fluentd.log and output plugin is writting in tdahent_elasticsearch.log file. Output is tailed of all this files so that we can see all the logs in timeline in which they occur.
==> ../tdagent/tdagent_kcsv.log <==
D, [2023-06-06T14:09:34.112400 #35453] DEBUG -- : Kcsv: JSON - {"field1"=>"value1", "f2"=>"v2", "f3"=>"v3", ...., "f4"=>"v4"}
I, [2023-06-06T14:09:34.112459 #35453] INFO -- : IUnidkhkhfkwehfkhewifhewifhweihfncnndndndndndndndn HELEOELEOLEOLEO
==> fluentd.log <==
2023-06-06 14:09:34 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
==> ../tdagent/tdagent_kcsv.log <==
I, [2023-06-06T14:09:34.113302 #35453] INFO -- : closed fluent socket object_id=2620
==> fluentd.log <==
2023-06-06 14:09:34 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
2023-06-06 14:09:34 +0530 [trace]: #0 fluent/log.rb:317:trace: enqueueing all chunks in buffer instance=1560
2023-06-06 14:09:34 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
2023-06-06 14:09:40 +0530 [trace]: #0 fluent/log.rb:317:trace: enqueueing all chunks in buffer instance=1560
2023-06-06 14:09:40 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
2023-06-06 14:09:45 +0530 [trace]: #0 fluent/log.rb:317:trace: enqueueing all chunks in buffer instance=1560
2023-06-06 14:09:45 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
Here from log we can verify that input plugin is converting log into JSON as desired but when we emit event. It seems that event is not emitted no output plugins log message are being displayed into logfile and nothing is inserted in elasticsearch indices.
Output plugin is being initialized and being registered also, we verify that using debug statement in initialize method of output plugin.
Require community help to debug the issue.
Also fluentd workers are dying repeatedly and starting again. After few msssages like below.
2023-06-06 14:09:45 +0530 [trace]: #0 fluent/log.rb:317:trace: enqueueing all chunks in buffer instance=1560
2023-06-06 14:09:45 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
From log i can see it is adding data into buffer queue and not dequeing it. Before dequeue it will go for restart. Some times when it deques it end ups in below error and go for restart. Basically it ends up with some thread related error i believe. If we look at last few lines of below logs it goes for restart and strangly only registers llelasticsearch(Name of custom plugin) plugin again.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
-
Ruby Version: ruby 2.7.8p225 (2023-03-30 revision 1f4d455848) [x86_64-linux]
Fluentd Version (Fluentd is built from source): fluentd 1.16.1
We have custom input plugin which open ups tcp port and listen on it. There it will receive log message from rsyslog. Custom input plugin will parse(Using Custom Mechanism not using parsers of fluentd) the log and will convert it into key value pair JSON object according to business logic. Then it will emit the event with tag,timestamp and json object.
Custom Output plugin will get those tag, timestamp and json object. According to tag it will decide the elasticsearch index and also will create payload including json object and required metadata of index and such and will make API Call to elasticsearch to push data into elasticsearch.
Code snippet which emits event.
Code snippet which takes emited object and will make api call.
Code snippet which will make a API Call
Have a look at logs below. Input plugin is writing log in kcsv.log, fluentd is writting in fluentd.log and output plugin is writting in tdahent_elasticsearch.log file. Output is tailed of all this files so that we can see all the logs in timeline in which they occur.
Here from log we can verify that input plugin is converting log into JSON as desired but when we emit event. It seems that event is not emitted no output plugins log message are being displayed into logfile and nothing is inserted in elasticsearch indices.
Output plugin is being initialized and being registered also, we verify that using debug statement in initialize method of output plugin.
Require community help to debug the issue.
Also fluentd workers are dying repeatedly and starting again. After few msssages like below.
From log i can see it is adding data into buffer queue and not dequeing it. Before dequeue it will go for restart. Some times when it deques it end ups in below error and go for restart. Basically it ends up with some thread related error i believe. If we look at last few lines of below logs it goes for restart and strangly only registers llelasticsearch(Name of custom plugin) plugin again.
Any help is highly appreciated. Thanks In Advance.
Let me know if more inputs are required...
Beta Was this translation helpful? Give feedback.
All reactions