From 0727fbc845f87aa2e5988a679521a75d59a67cf8 Mon Sep 17 00:00:00 2001 From: Cola Cheng Date: Wed, 5 Feb 2025 18:27:38 +0800 Subject: [PATCH 1/3] Set read preference as primaryPreferred when topology is single and is replica. --- lib/mongo/read_preference.ex | 3 +++ lib/mongo/server_description.ex | 13 ++++++++++--- lib/mongo/topology_description.ex | 29 ++++++++++++++++------------- test/support/topology_test_data.ex | 27 ++++++++++++++++++--------- 4 files changed, 47 insertions(+), 25 deletions(-) diff --git a/lib/mongo/read_preference.ex b/lib/mongo/read_preference.ex index 3671f5e3..fb3ea131 100644 --- a/lib/mongo/read_preference.ex +++ b/lib/mongo/read_preference.ex @@ -141,4 +141,7 @@ defmodule Mongo.ReadPreference do filter_nils(read_preference) end + + def to_topology_single_type({_, %{replica?: true} = _server_description}), do: %{mode: :primaryPreferred} + def to_topology_single_type(_), do: nil end diff --git a/lib/mongo/server_description.ex b/lib/mongo/server_description.ex index ece27c17..68943fa5 100644 --- a/lib/mongo/server_description.ex +++ b/lib/mongo/server_description.ex @@ -41,7 +41,8 @@ defmodule Mongo.ServerDescription do compression: [compressor_types], read_only: boolean(), logical_session_timeout: non_neg_integer, - supports_retryable_writes: boolean() + supports_retryable_writes: boolean(), + replica?: boolean() } @empty %{ @@ -69,7 +70,8 @@ defmodule Mongo.ServerDescription do compression: [], read_only: false, logical_session_timeout: 30, - support_retryable_writes: false + support_retryable_writes: false, + replica?: false } def new() do @@ -147,7 +149,8 @@ defmodule Mongo.ServerDescription do compression: map_compressors(hello_response["compression"]), read_only: hello_response["readOnly"] || false, logical_session_timeout: hello_response["logicalSessionTimeoutMinutes"] || 30, - supports_retryable_writes: server_type != :standalone && max_wire_version >= @retryable_wire_version && hello_response["logicalSessionTimeoutMinutes"] != nil + supports_retryable_writes: server_type != :standalone && max_wire_version >= @retryable_wire_version && hello_response["logicalSessionTimeoutMinutes"] != nil, + replica?: replica?(server_type) } end @@ -187,4 +190,8 @@ defmodule Mongo.ServerDescription do [:zlib] end end + + defp replica?(server_type) do + server_type in [:rs_primary, :rs_secondary, :rs_arbiter, :rs_other, :rs_ghost] + end end diff --git a/lib/mongo/topology_description.ex b/lib/mongo/topology_description.ex index 180f536d..7d06a1da 100644 --- a/lib/mongo/topology_description.ex +++ b/lib/mongo/topology_description.ex @@ -123,19 +123,20 @@ defmodule Mongo.TopologyDescription do |> Keyword.get(:read_preference) |> ReadPreference.merge_defaults() - {servers, read_prefs} = + {server, read_prefs} = case topology.type do :unknown -> - {[], nil} + {nil, nil} :single -> - {topology.servers, nil} + server = pick_server(topology.servers) + {server, ReadPreference.to_topology_single_type(server)} :sharded -> - {mongos_servers(topology), ReadPreference.to_mongos(read_preference)} + {mongos_servers(topology) |> pick_server(), ReadPreference.to_mongos(read_preference)} _other -> - {select_replica_set_server(topology, read_preference.mode, read_preference), ReadPreference.to_replica_set(read_preference)} + {select_replica_set_server(topology, read_preference.mode, read_preference) |> pick_server(), ReadPreference.to_replica_set(read_preference)} end opts = @@ -147,17 +148,12 @@ defmodule Mongo.TopologyDescription do Keyword.put(opts, :read_preference, prefs) end - server = - servers - |> Enum.take_random(1) - |> Enum.map(fn {server, description} -> {server, description.compression} end) - case server do - [] -> + nil -> :empty - [{addr, compression}] -> - {:ok, {addr, merge_compression(opts, compression)}} + {addr, server_description} -> + {:ok, {addr, merge_compression(opts, server_description.compression)}} end end @@ -182,6 +178,13 @@ defmodule Mongo.TopologyDescription do end end + defp pick_server(servers) do + case Enum.take_random(servers, 1) do + [] -> nil + [server] -> server + end + end + defp mongos_servers(%{:servers => servers}) do Enum.filter(servers, fn {_, server} -> server.type == :mongos end) end diff --git a/test/support/topology_test_data.ex b/test/support/topology_test_data.ex index e22ef3ac..b1842ea0 100644 --- a/test/support/topology_test_data.ex +++ b/test/support/topology_test_data.ex @@ -30,7 +30,8 @@ defmodule Mongo.TopologyTestData do set_version: nil, tag_set: %{}, type: :standalone, - compression: [] + compression: [], + replica?: false } } } @@ -64,7 +65,8 @@ defmodule Mongo.TopologyTestData do set_version: nil, tag_set: %{}, type: :mongos, - compression: [] + compression: [], + replica?: false } } } @@ -102,7 +104,8 @@ defmodule Mongo.TopologyTestData do "localhost:27018", "localhost:27019", "localhost:27020" - ] + ], + replica?: true }, "localhost:27019" => %{ address: "localhost:27019", @@ -127,7 +130,8 @@ defmodule Mongo.TopologyTestData do "localhost:27018", "localhost:27019", "localhost:27020" - ] + ], + replica?: true }, "localhost:27020" => %{ address: "localhost:27020", @@ -152,7 +156,8 @@ defmodule Mongo.TopologyTestData do "localhost:27018", "localhost:27019", "localhost:27020" - ] + ], + replica?: true } } } @@ -186,7 +191,8 @@ defmodule Mongo.TopologyTestData do tag_set: %{}, type: :unknown, hosts: [], - compression: [] + compression: [], + replica?: true }, "localhost:27019" => %{ address: "localhost:27019", @@ -211,7 +217,8 @@ defmodule Mongo.TopologyTestData do "localhost:27019", "localhost:27020" ], - compression: [] + compression: [], + replica?: true }, "localhost:27020" => %{ address: "localhost:27020", @@ -236,7 +243,8 @@ defmodule Mongo.TopologyTestData do "localhost:27019", "localhost:27020" ], - compression: [] + compression: [], + replica?: true } } } @@ -270,7 +278,8 @@ defmodule Mongo.TopologyTestData do tag_set: %{}, type: :rs_primary, hosts: ["localhost:27018"], - compression: [] + compression: [], + replica?: true } } } From 940af75fa3dff5977f2bf9c8af6149d54a2c11d7 Mon Sep 17 00:00:00 2001 From: Cola Cheng Date: Thu, 6 Feb 2025 12:42:16 +0800 Subject: [PATCH 2/3] Put missing replica? in ServerDescription.parse_hello_response/1 and add test. --- lib/mongo/server_description.ex | 3 +- test/mongo/topology_description_test.exs | 8 ++++++ test/support/topology_test_data.ex | 35 ++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/lib/mongo/server_description.ex b/lib/mongo/server_description.ex index 68943fa5..3d749698 100644 --- a/lib/mongo/server_description.ex +++ b/lib/mongo/server_description.ex @@ -117,7 +117,8 @@ defmodule Mongo.ServerDescription do compression: map_compressors(hello_response["compression"]), read_only: hello_response["readOnly"] || false, logical_session_timeout: hello_response["logicalSessionTimeoutMinutes"] || 30, - supports_retryable_writes: supports_retryable_writes + supports_retryable_writes: supports_retryable_writes, + replica?: replica?(server_type) } end diff --git a/test/mongo/topology_description_test.exs b/test/mongo/topology_description_test.exs index 2f448746..2b1d04a6 100644 --- a/test/mongo/topology_description_test.exs +++ b/test/mongo/topology_description_test.exs @@ -137,4 +137,12 @@ defmodule Mongo.TopologyDescriptionTest do assert :single = TopologyDescription.get_type(opts) end + + test "Set read_preference to :primaryPreferred when topology is single and server is replica set" do + assert {:ok, {_, opts}} = TopologyDescription.select_servers(single(), :read, []) + assert nil == Keyword.get(opts, :read_preference) + + assert {:ok, {_, opts}} = TopologyDescription.select_servers(single_with_repl_set(), :read, []) + assert :primaryPreferred = Keyword.get(opts, :read_preference) |> Map.get(:mode) + end end diff --git a/test/support/topology_test_data.ex b/test/support/topology_test_data.ex index b1842ea0..58bb9631 100644 --- a/test/support/topology_test_data.ex +++ b/test/support/topology_test_data.ex @@ -36,6 +36,41 @@ defmodule Mongo.TopologyTestData do } } + def single_with_repl_set(), + do: %{ + set_name: nil, + type: :single, + compatibility_error: nil, + compatible: true, + local_threshold_ms: 15, + max_election_id: nil, + max_set_version: nil, + servers: %{ + "localhost:27017" => %{ + address: "localhost:27017", + arbiters: [], + election_id: nil, + error: nil, + hosts: [], + last_update_time: nil, + last_write_date: nil, + max_wire_version: 4, + me: nil, + min_wire_version: 0, + op_time: nil, + passives: [], + primary: nil, + round_trip_time: 44, + set_name: nil, + set_version: nil, + tag_set: %{}, + type: :standalone, + compression: [], + replica?: true + } + } + } + def sharded(), do: %{ set_name: nil, From 7aefdf3b0f62b2086e423a28ce2a0dadfb4a0d69 Mon Sep 17 00:00:00 2001 From: Cola Cheng Date: Thu, 6 Feb 2025 14:57:02 +0800 Subject: [PATCH 3/3] update code format. --- lib/mongo/topology_description.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/mongo/topology_description.ex b/lib/mongo/topology_description.ex index 7d06a1da..a0ddc8fe 100644 --- a/lib/mongo/topology_description.ex +++ b/lib/mongo/topology_description.ex @@ -133,10 +133,10 @@ defmodule Mongo.TopologyDescription do {server, ReadPreference.to_topology_single_type(server)} :sharded -> - {mongos_servers(topology) |> pick_server(), ReadPreference.to_mongos(read_preference)} + {topology |> mongos_servers() |> pick_server(), ReadPreference.to_mongos(read_preference)} _other -> - {select_replica_set_server(topology, read_preference.mode, read_preference) |> pick_server(), ReadPreference.to_replica_set(read_preference)} + {topology |> select_replica_set_server(read_preference.mode, read_preference) |> pick_server(), ReadPreference.to_replica_set(read_preference)} end opts =