From fd7fe86a5e46c7c7c8336f8213ef369953d6c6a8 Mon Sep 17 00:00:00 2001 From: zookzook Date: Tue, 11 Feb 2025 09:41:38 +0100 Subject: [PATCH 1/4] fix: improved error handling when using change streams --- lib/mongo/change_stream.ex | 40 ++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/lib/mongo/change_stream.ex b/lib/mongo/change_stream.ex index d363db91..85ecff3d 100644 --- a/lib/mongo/change_stream.ex +++ b/lib/mongo/change_stream.ex @@ -61,9 +61,17 @@ defmodule Mongo.ChangeStream do %{docs: [], topology_pid: topology_pid, session: session, cursor: cursor, change_stream: change_stream, coll: coll} = state -> case get_more(topology_pid, session, only_coll(coll), cursor, change_stream, opts) do - {:ok, %{cursor_id: cursor_id, docs: docs, change_stream: change_stream}} -> {docs, %{state | cursor: cursor_id, change_stream: change_stream}} - {:resume, %{docs: docs} = state} -> {docs, %{state | docs: []}} - {:error, error} -> raise error + {:ok, %{cursor_id: cursor_id, docs: docs, change_stream: change_stream}} -> + {docs, %{state | cursor: cursor_id, change_stream: change_stream}} + + {:resume, %{docs: docs} = state} -> + {docs, %{state | docs: []}} + + {:aborted, _error} -> + {:halt, %{state | session: nil, cursor: 0}} + + {:error, error} -> + raise error end %{docs: docs} = state -> @@ -85,8 +93,11 @@ defmodule Mongo.ChangeStream do Session.end_session(topology_pid, session) case Error.should_retry_read(error, cmd, opts) do - true -> aggregate(topology_pid, cmd, fun, Keyword.put(opts, :read_counter, 2)) - false -> {:error, error} + true -> + aggregate(topology_pid, cmd, fun, Keyword.put(opts, :read_counter, 2)) + + false -> + {:error, error} end end end @@ -172,8 +183,15 @@ defmodule Mongo.ChangeStream do Session.end_session(topology_pid, session) # Start aggregation again... - with {:ok, state} <- aggregate(topology_pid, aggregate_cmd, fun, opts) do - {:resume, state} + ## the previous cursor is closed. If the retry fails we return aborted + ## and the calling function will remove the session and the cursor + ## the end function won't stuck when calling kill_cursor + case aggregate(topology_pid, aggregate_cmd, fun, opts) do + {:ok, new_state} -> + {:resume, new_state} + + error -> + {:aborted, error} end reason -> @@ -283,13 +301,15 @@ defmodule Mongo.ChangeStream do defp after_fun(opts) do fn + %{session: nil, cursor: 0} -> + :noop + %{topology_pid: topology_pid, session: session, cursor: 0} -> Session.end_session(topology_pid, session) %{topology_pid: topology_pid, session: session, cursor: cursor, coll: coll} -> - with :ok <- kill_cursor(session, only_coll(coll), cursor, opts) do - Session.end_session(topology_pid, session) - end + kill_cursor(session, only_coll(coll), cursor, opts) + Session.end_session(topology_pid, session) error -> error From c8aa5cbdae5fd612dff21659056881a85faf0978 Mon Sep 17 00:00:00 2001 From: zookzook Date: Tue, 11 Feb 2025 09:41:51 +0100 Subject: [PATCH 2/4] chore: updated dependencies --- mix.lock | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/mix.lock b/mix.lock index 32ae837a..9884e51d 100644 --- a/mix.lock +++ b/mix.lock @@ -1,30 +1,30 @@ %{ "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, "castore": {:hex, :castore, "1.0.8", "dedcf20ea746694647f883590b82d9e96014057aff1d44d03ec90f36a5c0dc6e", [:mix], [], "hexpm", "0b2b66d2ee742cb1d9cb8c8be3b43c3a70ee8651f37b75a8b982e036752983f1"}, - "credo": {:hex, :credo, "1.7.4", "68ca5cf89071511c12fd9919eb84e388d231121988f6932756596195ccf7fd35", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "9cf776d062c78bbe0f0de1ecaee183f18f2c3ec591326107989b054b7dddefc2"}, - "db_connection": {:hex, :db_connection, "2.6.0", "77d835c472b5b67fc4f29556dee74bf511bbafecdcaf98c27d27fa5918152086", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c2f992d15725e721ec7fbc1189d4ecdb8afef76648c746a8e1cad35e3b8a35f3"}, - "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"}, + "credo": {:hex, :credo, "1.7.11", "d3e805f7ddf6c9c854fd36f089649d7cf6ba74c42bc3795d587814e3c9847102", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "56826b4306843253a66e47ae45e98e7d284ee1f95d53d1612bb483f88a8cf219"}, + "db_connection": {:hex, :db_connection, "2.7.0", "b99faa9291bb09892c7da373bb82cba59aefa9b36300f6145c5f201c7adf48ec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dcf08f31b2701f857dfc787fbad78223d61a32204f217f15e881dd93e4bdd3ff"}, + "decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.43", "34b2f401fe473080e39ff2b90feb8ddfeef7639f8ee0bbf71bb41911831d77c5", [:mix], [], "hexpm", "970a3cd19503f5e8e527a190662be2cee5d98eed1ff72ed9b3d1a3d466692de8"}, "ex_doc": {:hex, :ex_doc, "0.32.2", "f60bbeb6ccbe75d005763e2a328e6f05e0624232f2393bc693611c2d3ae9fa0e", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "a4480305cdfe7fdfcbb77d1092c76161626d9a7aa4fb698aee745996e34602df"}, "ex_zstd": {:hex, :ex_zstd, "0.1.0", "4b1b5ebd7c0417e69308db8cdd478b9adb3e2d1a03b6e7366cf0a9aadeae11af", [:make, :mix], [{:ex_doc, ">= 0.0.0", [hex: :ex_doc, repo: "hexpm", optional: false]}], "hexpm", "2c9542a5c088e0eab14aa9b10d18bc084a6060ecf09025bbfc5b08684568bc67"}, "ezstd": {:hex, :ezstd, "1.1.0", "d3b483d6acfadfb65dba4015371e6d54526dbf3d9ef0941b5add8bf5890731f4", [:rebar3], [], "hexpm", "28cfa0ed6cc3922095ad5ba0f23392a1664273358b17184baa909868361184e7"}, - "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, + "file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"}, "finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"}, "hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"}, - "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, - "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, - "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, + "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, + "makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"}, + "makeup_erlang": {:hex, :makeup_erlang, "1.0.2", "03e1804074b3aa64d5fad7aa64601ed0fb395337b982d9bcf04029d68d51b6a7", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "af33ff7ef368d5893e4a267933e7744e46ce3cf1f61e2dccf53a111ed3aa3727"}, "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"}, "mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"}, "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, - "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "patch": {:hex, :patch, "0.12.0", "2da8967d382bade20344a3e89d618bfba563b12d4ac93955468e830777f816b0", [:mix], [], "hexpm", "ffd0e9a7f2ad5054f37af84067ee88b1ad337308a1cb227e181e3967127b0235"}, "r_zstd": {:hex, :r_zstd, "1.0.1", "4899cae27f14ac4e323786ee30359d3245ddff660bb50e8a6e2138347e8d187c", [:mix], [{:rustler, "~> 0.21", [hex: :rustler, repo: "hexpm", optional: false]}, {:unsafe, "~> 1.0", [hex: :unsafe, repo: "hexpm", optional: false]}], "hexpm", "9aacab907b6a11321303cd80e248a4d62de95bf1779bdef3a4afa2d99907626c"}, "req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"}, "rustler": {:hex, :rustler, "0.34.0", "e9a73ee419fc296a10e49b415a2eb87a88c9217aa0275ec9f383d37eed290c1c", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}, {:toml, "~> 0.6", [hex: :toml, repo: "hexpm", optional: false]}], "hexpm", "1d0c7449482b459513003230c0e2422b0252245776fe6fd6e41cb2b11bd8e628"}, - "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"}, "unsafe": {:hex, :unsafe, "1.0.2", "23c6be12f6c1605364801f4b47007c0c159497d0446ad378b5cf05f1855c0581", [:mix], [], "hexpm", "b485231683c3ab01a9cd44cb4a79f152c6f3bb87358439c6f68791b85c2df675"}, } From 99d03cefb414f576202601777be0b8e8b93071e4 Mon Sep 17 00:00:00 2001 From: zookzook Date: Tue, 11 Feb 2025 09:42:06 +0100 Subject: [PATCH 3/4] chore: removed warning when using Elixir 1.8.x --- test/bson/types_test.exs | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/test/bson/types_test.exs b/test/bson/types_test.exs index 8bc22c6c..0d33bbd0 100644 --- a/test/bson/types_test.exs +++ b/test/bson/types_test.exs @@ -17,11 +17,17 @@ defmodule BSON.TypesTest do assert inspect(@objectid) == "#BSON.ObjectId<#{@string}>" end - test "BSON.ObjectId.encode!/1" do - assert BSON.ObjectId.encode!(@objectid) == @string + if Version.match?(System.version(), "<= 1.8.0") do + test "BSON.ObjectId.encode!/1" do + assert BSON.ObjectId.encode!(@objectid) == @string - assert_raise FunctionClauseError, fn -> - BSON.ObjectId.encode!("") + assert_raise FunctionClauseError, fn -> + BSON.ObjectId.encode!("") + end + end + else + test "BSON.ObjectId.encode!/1" do + assert BSON.ObjectId.encode!(@objectid) == @string end end @@ -47,12 +53,19 @@ defmodule BSON.TypesTest do assert to_string(@objectid) == @string end - test "BSON.ObjectId.get_timestamp!/1" do - value = BSON.ObjectId.get_timestamp!(@objectid) - assert DateTime.compare(value, @timestamp) == :eq + if Version.match?(System.version(), "<= 1.8.0") do + test "BSON.ObjectId.get_timestamp!/1" do + value = BSON.ObjectId.get_timestamp!(@objectid) + assert DateTime.compare(value, @timestamp) == :eq - assert_raise FunctionClauseError, fn -> - BSON.ObjectId.get_timestamp!("") + assert_raise FunctionClauseError, fn -> + BSON.ObjectId.get_timestamp!("") + end + end + else + test "BSON.ObjectId.get_timestamp!/1" do + value = BSON.ObjectId.get_timestamp!(@objectid) + assert DateTime.compare(value, @timestamp) == :eq end end From d9b3ccb96c0033a8a05febe9a372a63d8f80a713 Mon Sep 17 00:00:00 2001 From: zookzook Date: Tue, 11 Feb 2025 10:12:39 +0100 Subject: [PATCH 4/4] make credo happy --- lib/mongo/events.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mongo/events.ex b/lib/mongo/events.ex index 7d0f5b1c..4c9a7b94 100644 --- a/lib/mongo/events.ex +++ b/lib/mongo/events.ex @@ -1,5 +1,5 @@ defmodule Mongo.Events do - @doc false + @moduledoc false require Logger