From 41b1cda11a8bff5433917367cfd935848bd220c6 Mon Sep 17 00:00:00 2001 From: tckeong Date: Thu, 19 Jun 2025 00:01:03 +0800 Subject: [PATCH 1/8] Create CloudWatch Logger Backend --- lib/cadet/logger/cloudwatch_logger.ex | 142 ++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 lib/cadet/logger/cloudwatch_logger.ex diff --git a/lib/cadet/logger/cloudwatch_logger.ex b/lib/cadet/logger/cloudwatch_logger.ex new file mode 100644 index 000000000..840bbf918 --- /dev/null +++ b/lib/cadet/logger/cloudwatch_logger.ex @@ -0,0 +1,142 @@ +defmodule Cadet.Logger.CloudWatchLogger do + @moduledoc """ + A custom Logger backend that sends logs to AWS CloudWatch. + This backend can be configured to log at different levels and formats, + and can include specific metadata in the logs. + """ + + @behaviour :gen_event + require Logger + + defstruct [:level, :format, :metadata, :log_group, :log_stream] + + @impl true + def init({__MODULE__, opts}) when is_list(opts) do + config = configure_merge(read_env(), opts) + {:ok, init(config, %__MODULE__{})} + end + + @impl true + def init({__MODULE__, name}) when is_atom(name) do + config = read_env() + {:ok, init(config, %__MODULE__{})} + end + + @impl true + def handle_call({:configure, options}, state) do + {:ok, :ok, configure(options, state)} + end + + @impl true + def handle_event({level, _gl, {Logger, msg, ts, md}}, state) do + %{format: format, metadata: metadata} = state + + if meet_level?(level, state.level) do + formatted_msg = Logger.Formatter.format(format, level, msg, ts, take_metadata(md, metadata)) + spawn(fn -> send_to_cloudwatch(formatted_msg, state) end) + end + + {:ok, state} + end + + def handle_event(_, state), do: {:ok, state} + def handle_call(_, state), do: {:ok, :ok, state} + def handle_info(_, state), do: {:ok, state} + + # Helpers + defp configure(options, state) do + config = configure_merge(read_env(), options) + Application.put_env(:logger, __MODULE__, config) + init(config, state) + end + + defp meet_level?(_lvl, nil), do: true + + defp meet_level?(lvl, min) do + Logger.compare_levels(lvl, min) != :lt + end + + defp send_to_cloudwatch(msg, state) do + %{log_group: log_group, log_stream: log_stream} = state + + # Ensure that the already have ExAws authentication configured + if :ets.whereis(ExAws.Config.AuthCache) != :undefined do + # The headers and body structure can be found in the AWS API documentation: + # https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html + operation = %ExAws.Operation.JSON{ + http_method: :post, + service: :logs, + headers: [ + {"x-amz-target", "Logs_20140328.PutLogEvents"}, + {"content-type", "application/x-amz-json-1.1"} + ], + data: %{ + "logGroupName" => log_group, + "logStreamName" => log_stream, + "logEvents" => [ + %{ + "timestamp" => :os.system_time(:millisecond), + "message" => msg + } + ] + } + } + + operation + |> ExAws.request() + |> case do + {:ok, _response} -> + :ok + + {:error, reason} -> + Logger.error("Failed to send log to CloudWatch: #{inspect(reason)}") + end + else + Logger.error("ExAws.Config.AuthCache is not available. Cannot send logs to CloudWatch.") + end + end + + defp init(config, state) do + level = Keyword.get(config, :level) + format = Logger.Formatter.compile(Keyword.get(config, :format)) + metadata = Keyword.get(config, :metadata, []) |> configure_metadata() + log_group = Keyword.get(config, :log_group, "cadet-logs") + log_stream = Keyword.get(config, :log_stream, "#{node()}-#{:os.system_time(:second)}") + + %{ + state + | level: level, + format: format, + metadata: metadata, + log_group: log_group, + log_stream: log_stream + } + end + + defp configure_metadata(:all), do: :all + defp configure_metadata(metadata), do: Enum.reverse(metadata) + + defp take_metadata(metadata, :all) do + metadata + end + + defp take_metadata(metadata, keys) do + Enum.reduce(keys, [], fn key, acc -> + case Keyword.fetch(metadata, key) do + {:ok, val} -> [{key, val} | acc] + :error -> acc + end + end) + end + + defp read_env do + Application.get_env(:logger, __MODULE__, Application.get_env(:logger, :cloudwatch_logger, [])) + end + + defp configure_merge(env, options) do + Keyword.merge(env, options, fn + :colors, v1, v2 -> Keyword.merge(v1, v2) + _, _v1, v2 -> v2 + end) + end +end From 045a55e79c8e73b6d4d1ab63f5faa6006d7f2d51 Mon Sep 17 00:00:00 2001 From: tckeong Date: Thu, 19 Jun 2025 00:23:11 +0800 Subject: [PATCH 2/8] Modified prod config --- config/prod.exs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/config/prod.exs b/config/prod.exs index c41ba0cb1..c6f38d24f 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -3,6 +3,17 @@ import Config # Do not print debug messages in production config :logger, level: :info +# Add the CloudWatch logger backend in production +config :logger, backends: [:console, {Cadet.Logger.CloudWatchLogger, :cloudwatch_logger}] + +# Configure CloudWatch Logger +config :logger, :cloudwatch_logger, + level: :info, + format: "$time $metadata[$level] $message\n", + metadata: [:request_id], + log_group: "cadet-logs", + log_stream: "#{node()}-#{:os.system_time(:second)}" + # ## SSL Support # # To get SSL working, you will need to add the `https` key From 31c54f325e651347d0c50ca4eaec6b01a886dd6d Mon Sep 17 00:00:00 2001 From: tckeong Date: Thu, 19 Jun 2025 00:38:42 +0800 Subject: [PATCH 3/8] Fix credo issue --- lib/cadet/logger/cloudwatch_logger.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/cadet/logger/cloudwatch_logger.ex b/lib/cadet/logger/cloudwatch_logger.ex index 840bbf918..3fdb8206e 100644 --- a/lib/cadet/logger/cloudwatch_logger.ex +++ b/lib/cadet/logger/cloudwatch_logger.ex @@ -99,7 +99,8 @@ defmodule Cadet.Logger.CloudWatchLogger do defp init(config, state) do level = Keyword.get(config, :level) format = Logger.Formatter.compile(Keyword.get(config, :format)) - metadata = Keyword.get(config, :metadata, []) |> configure_metadata() + raw_metadata = Keyword.get(config, :metadata, []) + metadata = configure_metadata(raw_metadata) log_group = Keyword.get(config, :log_group, "cadet-logs") log_stream = Keyword.get(config, :log_stream, "#{node()}-#{:os.system_time(:second)}") From d6c92e18b80e52d274dd348c11d9cdcd83b3a11e Mon Sep 17 00:00:00 2001 From: tckeong Date: Thu, 19 Jun 2025 15:48:16 +0800 Subject: [PATCH 4/8] Add docs --- lib/cadet/logger/cloudwatch_logger.ex | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/cadet/logger/cloudwatch_logger.ex b/lib/cadet/logger/cloudwatch_logger.ex index 3fdb8206e..d34b0ea38 100644 --- a/lib/cadet/logger/cloudwatch_logger.ex +++ b/lib/cadet/logger/cloudwatch_logger.ex @@ -134,9 +134,12 @@ defmodule Cadet.Logger.CloudWatchLogger do Application.get_env(:logger, __MODULE__, Application.get_env(:logger, :cloudwatch_logger, [])) end + @doc """ + Merges the given options with the existing environment configuration. + If a key exists in both, the value from `options` will take precedence. + """ defp configure_merge(env, options) do Keyword.merge(env, options, fn - :colors, v1, v2 -> Keyword.merge(v1, v2) _, _v1, v2 -> v2 end) end From d271b445ad52650a3971fe9e2c802f87aeca05fd Mon Sep 17 00:00:00 2001 From: tckeong Date: Thu, 19 Jun 2025 19:32:04 +0800 Subject: [PATCH 5/8] Refactor the CloudWatch Logger --- lib/cadet/logger/cloudwatch_logger.ex | 95 ++++++++++++++++++--------- 1 file changed, 64 insertions(+), 31 deletions(-) diff --git a/lib/cadet/logger/cloudwatch_logger.ex b/lib/cadet/logger/cloudwatch_logger.ex index d34b0ea38..fa1aa56a5 100644 --- a/lib/cadet/logger/cloudwatch_logger.ex +++ b/lib/cadet/logger/cloudwatch_logger.ex @@ -33,7 +33,7 @@ defmodule Cadet.Logger.CloudWatchLogger do if meet_level?(level, state.level) do formatted_msg = Logger.Formatter.format(format, level, msg, ts, take_metadata(md, metadata)) - spawn(fn -> send_to_cloudwatch(formatted_msg, state) end) + send_to_cloudwatch_async(formatted_msg, state) end {:ok, state} @@ -56,46 +56,78 @@ defmodule Cadet.Logger.CloudWatchLogger do Logger.compare_levels(lvl, min) != :lt end + defp send_to_cloudwatch_async(msg, state) do + Task.start(fn -> send_to_cloudwatch(msg, state) end) + end + defp send_to_cloudwatch(msg, state) do %{log_group: log_group, log_stream: log_stream} = state # Ensure that the already have ExAws authentication configured - if :ets.whereis(ExAws.Config.AuthCache) != :undefined do - # The headers and body structure can be found in the AWS API documentation: - # https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html - operation = %ExAws.Operation.JSON{ - http_method: :post, - service: :logs, - headers: [ - {"x-amz-target", "Logs_20140328.PutLogEvents"}, - {"content-type", "application/x-amz-json-1.1"} - ], - data: %{ - "logGroupName" => log_group, - "logStreamName" => log_stream, - "logEvents" => [ - %{ - "timestamp" => :os.system_time(:millisecond), - "message" => msg - } - ] - } - } + with :ok <- check_exaws_config() do + operation = build_log_operation(msg, state) operation - |> ExAws.request() - |> case do - {:ok, _response} -> - :ok + |> send_with_retry() + end + end - {:error, reason} -> - Logger.error("Failed to send log to CloudWatch: #{inspect(reason)}") - end + defp build_log_operation(msg, state) do + %{log_group: log_group, log_stream: log_stream} = state + + # The headers and body structure can be found in the AWS API documentation: + # https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html + %ExAws.Operation.JSON{ + http_method: :post, + service: :logs, + headers: [ + {"x-amz-target", "Logs_20140328.PutLogEvents"}, + {"content-type", "application/x-amz-json-1.1"} + ], + data: %{ + "logGroupName" => log_group, + "logStreamName" => log_stream, + "logEvents" => [ + %{ + "timestamp" => :os.system_time(:millisecond), + "message" => msg + } + ] + } + } + end + + defp check_exaws_config do + if :ets.whereis(ExAws.Config.AuthCache) == :undefined do + Logger.error( + "ExAws.Config.AuthCache is not available. Please ensure ExAws is properly configured." + ) + + :error else - Logger.error("ExAws.Config.AuthCache is not available. Cannot send logs to CloudWatch.") + :ok end end + defp send_with_retry(operation, retries \\ 3) + + defp send_with_retry(operation, retries) when retries > 0 do + case ExAws.request(operation) do + {:ok, _response} -> + :ok + + {:error, reason} -> + Logger.error("Failed to send log to CloudWatch: #{inspect(reason)}. Retrying...") + # Wait before retrying + :timer.sleep(500) + send_with_retry(operation, retries - 1) + end + end + + defp send_with_retry(_, 0) do + Logger.error("Failed to send log to CloudWatch after multiple retries.") + end + defp init(config, state) do level = Keyword.get(config, :level) format = Logger.Formatter.compile(Keyword.get(config, :format)) @@ -134,10 +166,11 @@ defmodule Cadet.Logger.CloudWatchLogger do Application.get_env(:logger, __MODULE__, Application.get_env(:logger, :cloudwatch_logger, [])) end - @doc """ + """ Merges the given options with the existing environment configuration. If a key exists in both, the value from `options` will take precedence. """ + defp configure_merge(env, options) do Keyword.merge(env, options, fn _, _v1, v2 -> v2 From 7e62a59f72733934ef2c142e83aba0f45b471e88 Mon Sep 17 00:00:00 2001 From: tckeong Date: Fri, 20 Jun 2025 00:58:49 +0800 Subject: [PATCH 6/8] Refactor CloudWatch Logger, add buffer to the logs --- lib/cadet/logger/cloudwatch_logger.ex | 119 +++++++++++++++++++++----- 1 file changed, 98 insertions(+), 21 deletions(-) diff --git a/lib/cadet/logger/cloudwatch_logger.ex b/lib/cadet/logger/cloudwatch_logger.ex index fa1aa56a5..da83d9f38 100644 --- a/lib/cadet/logger/cloudwatch_logger.ex +++ b/lib/cadet/logger/cloudwatch_logger.ex @@ -8,7 +8,12 @@ defmodule Cadet.Logger.CloudWatchLogger do @behaviour :gen_event require Logger - defstruct [:level, :format, :metadata, :log_group, :log_stream] + defstruct [:level, :format, :metadata, :log_group, :log_stream, :buffer, :timer_ref] + + @max_buffer_size 1000 + @max_retries 3 + @retry_delay 500 + @flush_interval 5000 @impl true def init({__MODULE__, opts}) when is_list(opts) do @@ -29,14 +34,62 @@ defmodule Cadet.Logger.CloudWatchLogger do @impl true def handle_event({level, _gl, {Logger, msg, ts, md}}, state) do - %{format: format, metadata: metadata} = state + %{ + format: format, + metadata: metadata, + buffer: buffer, + log_stream: log_stream, + log_group: log_group + } = state if meet_level?(level, state.level) do formatted_msg = Logger.Formatter.format(format, level, msg, ts, take_metadata(md, metadata)) - send_to_cloudwatch_async(formatted_msg, state) + timestamp = timestamp_from_logger_ts(ts) + + log_event = %{ + "timestamp" => timestamp, + "message" => IO.chardata_to_string(formatted_msg) + } + + new_buffer = [log_event | buffer] + + new_buffer = + if length(new_buffer) >= @max_buffer_size do + flush_buffer_async(log_stream, log_group, new_buffer) + [] + else + new_buffer + end + + {:ok, %{state | buffer: new_buffer}} + else + {:ok, state} end + end + + @impl true + def handle_info(:flush_buffer, state) do + %{buffer: buffer, timer_ref: timer_ref, log_stream: log_stream, log_group: log_group} = state + + new_state = + if length(buffer) > 0 do + flush_buffer_sync(log_stream, log_group, buffer) + %{state | buffer: []} + else + state + end + + new_timer_ref = schedule_flush(@flush_interval) + {:ok, %{new_state | timer_ref: new_timer_ref}} + end + + @impl true + def terminate(_reason, state) do + %{log_stream: log_stream, log_group: log_group, buffer: buffer, timer_ref: timer_ref} = state - {:ok, state} + if timer_ref, do: Process.cancel_timer(timer_ref) + flush_buffer_sync(log_stream, log_group, buffer) + :ok end def handle_event(_, state), do: {:ok, state} @@ -56,25 +109,33 @@ defmodule Cadet.Logger.CloudWatchLogger do Logger.compare_levels(lvl, min) != :lt end - defp send_to_cloudwatch_async(msg, state) do - Task.start(fn -> send_to_cloudwatch(msg, state) end) + defp flush_buffer_async(log_stream, log_group, buffer) do + if length(buffer) > 0 do + Task.start(fn -> send_to_cloudwatch(log_stream, log_group, buffer) end) + end end - defp send_to_cloudwatch(msg, state) do - %{log_group: log_group, log_stream: log_stream} = state + defp flush_buffer_sync(log_stream, log_group, buffer) do + if length(buffer) > 0 do + send_to_cloudwatch(log_stream, log_group, buffer) + end + end + defp schedule_flush(interval) do + Process.send_after(self(), :flush_buffer, interval) + end + + defp send_to_cloudwatch(log_stream, log_group, buffer) do # Ensure that the already have ExAws authentication configured with :ok <- check_exaws_config() do - operation = build_log_operation(msg, state) + operation = build_log_operation(log_stream, log_group, buffer) operation |> send_with_retry() end end - defp build_log_operation(msg, state) do - %{log_group: log_group, log_stream: log_stream} = state - + defp build_log_operation(log_stream, log_group, buffer) do # The headers and body structure can be found in the AWS API documentation: # https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html %ExAws.Operation.JSON{ @@ -87,12 +148,7 @@ defmodule Cadet.Logger.CloudWatchLogger do data: %{ "logGroupName" => log_group, "logStreamName" => log_stream, - "logEvents" => [ - %{ - "timestamp" => :os.system_time(:millisecond), - "message" => msg - } - ] + "logEvents" => Enum.reverse(buffer) } } end @@ -109,7 +165,7 @@ defmodule Cadet.Logger.CloudWatchLogger do end end - defp send_with_retry(operation, retries \\ 3) + defp send_with_retry(operation, retries \\ @max_retries) defp send_with_retry(operation, retries) when retries > 0 do case ExAws.request(operation) do @@ -119,7 +175,7 @@ defmodule Cadet.Logger.CloudWatchLogger do {:error, reason} -> Logger.error("Failed to send log to CloudWatch: #{inspect(reason)}. Retrying...") # Wait before retrying - :timer.sleep(500) + :timer.sleep(@retry_delay) send_with_retry(operation, retries - 1) end end @@ -135,6 +191,7 @@ defmodule Cadet.Logger.CloudWatchLogger do metadata = configure_metadata(raw_metadata) log_group = Keyword.get(config, :log_group, "cadet-logs") log_stream = Keyword.get(config, :log_stream, "#{node()}-#{:os.system_time(:second)}") + timer_ref = schedule_flush(@flush_interval) %{ state @@ -142,7 +199,9 @@ defmodule Cadet.Logger.CloudWatchLogger do format: format, metadata: metadata, log_group: log_group, - log_stream: log_stream + log_stream: log_stream, + buffer: [], + timer_ref: timer_ref } end @@ -162,6 +221,24 @@ defmodule Cadet.Logger.CloudWatchLogger do end) end + defp timestamp_from_logger_ts({{year, month, day}, {hour, minute, second, microsecond}}) do + datetime = %DateTime{ + year: year, + month: month, + day: day, + hour: hour, + minute: minute, + second: second, + microsecond: {microsecond, 6}, + time_zone: "Etc/UTC", + zone_abbr: "UTC", + utc_offset: 0, + std_offset: 0 + } + + DateTime.to_unix(datetime, :millisecond) + end + defp read_env do Application.get_env(:logger, __MODULE__, Application.get_env(:logger, :cloudwatch_logger, [])) end From 8da8e2d1c72aa036454b7913b1f491e5b9841475 Mon Sep 17 00:00:00 2001 From: tckeong Date: Fri, 20 Jun 2025 01:19:58 +0800 Subject: [PATCH 7/8] Enhance aws validation check. --- lib/cadet/logger/cloudwatch_logger.ex | 39 +++++++++++++++++++++------ 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/lib/cadet/logger/cloudwatch_logger.ex b/lib/cadet/logger/cloudwatch_logger.ex index da83d9f38..44282a42c 100644 --- a/lib/cadet/logger/cloudwatch_logger.ex +++ b/lib/cadet/logger/cloudwatch_logger.ex @@ -42,7 +42,7 @@ defmodule Cadet.Logger.CloudWatchLogger do log_group: log_group } = state - if meet_level?(level, state.level) do + if meet_level?(level, state.level) and not meet_cloudwatch_error?(msg) do formatted_msg = Logger.Formatter.format(format, level, msg, ts, take_metadata(md, metadata)) timestamp = timestamp_from_logger_ts(ts) @@ -109,6 +109,14 @@ defmodule Cadet.Logger.CloudWatchLogger do Logger.compare_levels(lvl, min) != :lt end + defp meet_cloudwatch_error?(msg) when is_binary(msg) do + String.starts_with?(msg, "Failed to send log to CloudWatch") + end + + defp meet_cloudwatch_error?(_) do + false + end + defp flush_buffer_async(log_stream, log_group, buffer) do if length(buffer) > 0 do Task.start(fn -> send_to_cloudwatch(log_stream, log_group, buffer) end) @@ -154,14 +162,29 @@ defmodule Cadet.Logger.CloudWatchLogger do end defp check_exaws_config do - if :ets.whereis(ExAws.Config.AuthCache) == :undefined do - Logger.error( - "ExAws.Config.AuthCache is not available. Please ensure ExAws is properly configured." - ) + id = System.get_env("AWS_ACCESS_KEY_ID") + secret = System.get_env("AWS_SECRET_ACCESS_KEY") + region = Application.get_env(:ex_aws, :region) || System.get_env("AWS_REGION") - :error - else - :ok + cond do + is_nil(id) or id == "" or is_nil(secret) or secret == "" -> + Logger.error( + "Failed to send log to CloudWatch. AWS credentials missing. Ensure AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are set. + " + ) + + :error + + region in [nil, ""] -> + Logger.error( + "Failed to send log to CloudWatch. AWS region not configured. Set AWS_REGION or :region under :ex_aws in config. + " + ) + + :error + + true -> + :ok end end From 7e55e87e1348e49e045cc27fa58cb38dd8b99b74 Mon Sep 17 00:00:00 2001 From: tckeong Date: Fri, 20 Jun 2025 15:37:02 +0800 Subject: [PATCH 8/8] Prevent timer_ref leak --- lib/cadet/logger/cloudwatch_logger.ex | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/cadet/logger/cloudwatch_logger.ex b/lib/cadet/logger/cloudwatch_logger.ex index 44282a42c..95c2e11a5 100644 --- a/lib/cadet/logger/cloudwatch_logger.ex +++ b/lib/cadet/logger/cloudwatch_logger.ex @@ -71,6 +71,8 @@ defmodule Cadet.Logger.CloudWatchLogger do def handle_info(:flush_buffer, state) do %{buffer: buffer, timer_ref: timer_ref, log_stream: log_stream, log_group: log_group} = state + if timer_ref, do: Process.cancel_timer(timer_ref) + new_state = if length(buffer) > 0 do flush_buffer_sync(log_stream, log_group, buffer)