Skip to content

Set read_preference to primaryPreferred when topology single with replica set. #266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/mongo/read_preference.ex
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,7 @@ defmodule Mongo.ReadPreference do

filter_nils(read_preference)
end

def to_topology_single_type({_, %{replica?: true} = _server_description}), do: %{mode: :primaryPreferred}
def to_topology_single_type(_), do: nil
end
16 changes: 12 additions & 4 deletions lib/mongo/server_description.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ defmodule Mongo.ServerDescription do
compression: [compressor_types],
read_only: boolean(),
logical_session_timeout: non_neg_integer,
supports_retryable_writes: boolean()
supports_retryable_writes: boolean(),
replica?: boolean()
}

@empty %{
Expand Down Expand Up @@ -69,7 +70,8 @@ defmodule Mongo.ServerDescription do
compression: [],
read_only: false,
logical_session_timeout: 30,
support_retryable_writes: false
support_retryable_writes: false,
replica?: false
}

def new() do
Expand Down Expand Up @@ -115,7 +117,8 @@ defmodule Mongo.ServerDescription do
compression: map_compressors(hello_response["compression"]),
read_only: hello_response["readOnly"] || false,
logical_session_timeout: hello_response["logicalSessionTimeoutMinutes"] || 30,
supports_retryable_writes: supports_retryable_writes
supports_retryable_writes: supports_retryable_writes,
replica?: replica?(server_type)
}
end

Expand Down Expand Up @@ -147,7 +150,8 @@ defmodule Mongo.ServerDescription do
compression: map_compressors(hello_response["compression"]),
read_only: hello_response["readOnly"] || false,
logical_session_timeout: hello_response["logicalSessionTimeoutMinutes"] || 30,
supports_retryable_writes: server_type != :standalone && max_wire_version >= @retryable_wire_version && hello_response["logicalSessionTimeoutMinutes"] != nil
supports_retryable_writes: server_type != :standalone && max_wire_version >= @retryable_wire_version && hello_response["logicalSessionTimeoutMinutes"] != nil,
replica?: replica?(server_type)
}
end

Expand Down Expand Up @@ -187,4 +191,8 @@ defmodule Mongo.ServerDescription do
[:zlib]
end
end

defp replica?(server_type) do
server_type in [:rs_primary, :rs_secondary, :rs_arbiter, :rs_other, :rs_ghost]
end
end
29 changes: 16 additions & 13 deletions lib/mongo/topology_description.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,20 @@ defmodule Mongo.TopologyDescription do
|> Keyword.get(:read_preference)
|> ReadPreference.merge_defaults()

{servers, read_prefs} =
{server, read_prefs} =
case topology.type do
:unknown ->
{[], nil}
{nil, nil}

:single ->
{topology.servers, nil}
server = pick_server(topology.servers)
{server, ReadPreference.to_topology_single_type(server)}

:sharded ->
{mongos_servers(topology), ReadPreference.to_mongos(read_preference)}
{mongos_servers(topology) |> pick_server(), ReadPreference.to_mongos(read_preference)}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you execute mix credo? It should complain about the |> operator.

topology  |> mongos_servers() |> pick_server()

Looks nicer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it. Thanks!


_other ->
{select_replica_set_server(topology, read_preference.mode, read_preference), ReadPreference.to_replica_set(read_preference)}
{select_replica_set_server(topology, read_preference.mode, read_preference) |> pick_server(), ReadPreference.to_replica_set(read_preference)}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topology  |> select_replica_set_server(read_preference.mode, read_preference) |> pick_server()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it. Thanks!

end

opts =
Expand All @@ -147,17 +148,12 @@ defmodule Mongo.TopologyDescription do
Keyword.put(opts, :read_preference, prefs)
end

server =
servers
|> Enum.take_random(1)
|> Enum.map(fn {server, description} -> {server, description.compression} end)

case server do
[] ->
nil ->
:empty

[{addr, compression}] ->
{:ok, {addr, merge_compression(opts, compression)}}
{addr, server_description} ->
{:ok, {addr, merge_compression(opts, server_description.compression)}}
end
end

Expand All @@ -182,6 +178,13 @@ defmodule Mongo.TopologyDescription do
end
end

defp pick_server(servers) do
case Enum.take_random(servers, 1) do
[] -> nil
[server] -> server
end
end

defp mongos_servers(%{:servers => servers}) do
Enum.filter(servers, fn {_, server} -> server.type == :mongos end)
end
Expand Down
8 changes: 8 additions & 0 deletions test/mongo/topology_description_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,12 @@ defmodule Mongo.TopologyDescriptionTest do

assert :single = TopologyDescription.get_type(opts)
end

test "Set read_preference to :primaryPreferred when topology is single and server is replica set" do
assert {:ok, {_, opts}} = TopologyDescription.select_servers(single(), :read, [])
assert nil == Keyword.get(opts, :read_preference)

assert {:ok, {_, opts}} = TopologyDescription.select_servers(single_with_repl_set(), :read, [])
assert :primaryPreferred = Keyword.get(opts, :read_preference) |> Map.get(:mode)
end
end
62 changes: 53 additions & 9 deletions test/support/topology_test_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,43 @@ defmodule Mongo.TopologyTestData do
set_version: nil,
tag_set: %{},
type: :standalone,
compression: []
compression: [],
replica?: false
}
}
}

def single_with_repl_set(),
do: %{
set_name: nil,
type: :single,
compatibility_error: nil,
compatible: true,
local_threshold_ms: 15,
max_election_id: nil,
max_set_version: nil,
servers: %{
"localhost:27017" => %{
address: "localhost:27017",
arbiters: [],
election_id: nil,
error: nil,
hosts: [],
last_update_time: nil,
last_write_date: nil,
max_wire_version: 4,
me: nil,
min_wire_version: 0,
op_time: nil,
passives: [],
primary: nil,
round_trip_time: 44,
set_name: nil,
set_version: nil,
tag_set: %{},
type: :standalone,
compression: [],
replica?: true
}
}
}
Expand Down Expand Up @@ -64,7 +100,8 @@ defmodule Mongo.TopologyTestData do
set_version: nil,
tag_set: %{},
type: :mongos,
compression: []
compression: [],
replica?: false
}
}
}
Expand Down Expand Up @@ -102,7 +139,8 @@ defmodule Mongo.TopologyTestData do
"localhost:27018",
"localhost:27019",
"localhost:27020"
]
],
replica?: true
},
"localhost:27019" => %{
address: "localhost:27019",
Expand All @@ -127,7 +165,8 @@ defmodule Mongo.TopologyTestData do
"localhost:27018",
"localhost:27019",
"localhost:27020"
]
],
replica?: true
},
"localhost:27020" => %{
address: "localhost:27020",
Expand All @@ -152,7 +191,8 @@ defmodule Mongo.TopologyTestData do
"localhost:27018",
"localhost:27019",
"localhost:27020"
]
],
replica?: true
}
}
}
Expand Down Expand Up @@ -186,7 +226,8 @@ defmodule Mongo.TopologyTestData do
tag_set: %{},
type: :unknown,
hosts: [],
compression: []
compression: [],
replica?: true
},
"localhost:27019" => %{
address: "localhost:27019",
Expand All @@ -211,7 +252,8 @@ defmodule Mongo.TopologyTestData do
"localhost:27019",
"localhost:27020"
],
compression: []
compression: [],
replica?: true
},
"localhost:27020" => %{
address: "localhost:27020",
Expand All @@ -236,7 +278,8 @@ defmodule Mongo.TopologyTestData do
"localhost:27019",
"localhost:27020"
],
compression: []
compression: [],
replica?: true
}
}
}
Expand Down Expand Up @@ -270,7 +313,8 @@ defmodule Mongo.TopologyTestData do
tag_set: %{},
type: :rs_primary,
hosts: ["localhost:27018"],
compression: []
compression: [],
replica?: true
}
}
}
Expand Down
Loading