diff --git a/lib/cubdb.ex b/lib/cubdb.ex index 9a3a851..5950316 100644 --- a/lib/cubdb.ex +++ b/lib/cubdb.ex @@ -126,7 +126,6 @@ defmodule CubDB do alias CubDB.Btree alias CubDB.CatchUp - alias CubDB.CleanUp alias CubDB.Compactor alias CubDB.Reader alias CubDB.Store @@ -410,6 +409,7 @@ defmodule CubDB do reduce: fn n, sum -> sum + n end # reduce to the sum of selected values ) """ + def select(db, options \\ []) when is_list(options) do timeout = Keyword.get(options, :timeout, :infinity) perform_read(db, {:select, options}, timeout) @@ -417,6 +417,13 @@ defmodule CubDB do @spec size(GenServer.server()) :: pos_integer + @doc """ + Returns the root location of the current store. + """ + def root_loc(db) do + GenServer.call(db, :root_loc, :infinity) + end + @doc """ Returns the number of entries present in the database. """ @@ -835,23 +842,43 @@ defmodule CubDB do auto_compact = parse_auto_compact!(Keyword.get(options, :auto_compact, true)) auto_file_sync = Keyword.get(options, :auto_file_sync, true) - with file_name when is_binary(file_name) or is_nil(file_name) <- find_db_file(data_dir), - {:ok, store} <- - Store.File.create(Path.join(data_dir, file_name || "0#{@db_file_extension}")), - {:ok, clean_up} <- CleanUp.start_link(data_dir), - {:ok, task_supervisor} <- Task.Supervisor.start_link() do - {:ok, - %State{ - btree: Btree.new(store), - task_supervisor: task_supervisor, - data_dir: data_dir, - clean_up: clean_up, - auto_compact: auto_compact, - auto_file_sync: auto_file_sync - }} - else - {:error, reason} -> - {:stop, reason} + case data_dir do + store when is_map(store) -> + with {:ok, clean_up} <- Store.start_cleanup(store), + {:ok, task_supervisor} <- Task.Supervisor.start_link() do + {:ok, + %State{ + btree: Btree.new(store), + task_supervisor: task_supervisor, + data_dir: data_dir, + clean_up: clean_up, + auto_compact: auto_compact, + auto_file_sync: auto_file_sync + }} + else + {:error, reason} -> + {:stop, reason} + end + + _ -> + with file_name when is_binary(file_name) or is_nil(file_name) <- find_db_file(data_dir), + {:ok, store} <- + Store.File.create(Path.join(data_dir, file_name || "0#{@db_file_extension}")), + {:ok, clean_up} <- Store.start_cleanup(store), + {:ok, task_supervisor} <- Task.Supervisor.start_link() do + {:ok, + %State{ + btree: Btree.new(store), + task_supervisor: task_supervisor, + data_dir: data_dir, + clean_up: clean_up, + auto_compact: auto_compact, + auto_file_sync: auto_file_sync + }} + else + {:error, reason} -> + {:stop, reason} + end end end @@ -873,8 +900,13 @@ defmodule CubDB do nil end - %Btree{store: %Store.File{file_path: file_path}} = btree - {:noreply, %State{state | readers: Map.put(readers, ref, {file_path, timer})}} + %Btree{store: store} = btree + {:noreply, %State{state | readers: Map.put(readers, ref, {Store.identifier(store), timer})}} + end + + def handle_call(:root_loc, _, state = %State{btree: btree}) do + %Btree{root_loc: loc, store: store} = btree + {:reply, {Store.identifier(store), loc}, state} end def handle_call(:size, _, state = %State{btree: btree}) do @@ -1115,12 +1147,14 @@ defmodule CubDB do @spec trigger_compaction(%State{}) :: {:ok, pid} | {:error, any} - defp trigger_compaction(state = %State{btree: btree, data_dir: data_dir, clean_up: clean_up}) do + defp trigger_compaction(state = %State{btree: btree, clean_up: clean_up}) do + %Btree{store: store} = btree + case compaction_running?(state) do false -> for pid <- state.subs, do: send(pid, :compaction_started) - {:ok, store} = new_compaction_store(data_dir) - CleanUp.clean_up_old_compaction_files(clean_up, store) + {:ok, store} = new_compaction_store(store) + Store.clean_up_old_compaction_files(store, clean_up) with result <- Task.Supervisor.start_child(state.task_supervisor, Compactor, :run, [ @@ -1181,22 +1215,10 @@ defmodule CubDB do Btree.new(store) end - @spec new_compaction_store(String.t()) :: {:ok, Store.t()} | {:error, any} - - defp new_compaction_store(data_dir) do - with {:ok, file_names} <- File.ls(data_dir) do - new_filename = - file_names - |> Enum.filter(&cubdb_file?/1) - |> Enum.map(&file_name_to_n/1) - |> Enum.sort() - |> List.last() - |> (&(&1 + 1)).() - |> Integer.to_string(16) - |> (&(&1 <> @compaction_file_extension)).() + @spec new_compaction_store(Store.t()) :: {:ok, Store.t()} | {:error, any} - Store.File.create(Path.join(data_dir, new_filename)) - end + defp new_compaction_store(store) do + Store.next_compaction_store(store) end @spec compaction_running?(%State{}) :: boolean @@ -1236,11 +1258,13 @@ defmodule CubDB do @spec clean_up_now(%State{}) :: %State{} defp clean_up_now(state = %State{btree: btree, clean_up: clean_up}) do + %Btree{store: store} = btree + for old_btree <- state.old_btrees do if Btree.alive?(old_btree), do: :ok = Btree.stop(old_btree) end - :ok = CleanUp.clean_up(clean_up, btree) + :ok = Store.clean_up(store, clean_up, btree) for pid <- state.subs, do: send(pid, :clean_up_started) %State{state | clean_up_pending: false, old_btrees: []} end @@ -1319,26 +1343,13 @@ defmodule CubDB do defp split_options(data_dir_or_options) do case Keyword.pop(data_dir_or_options, :data_dir) do {nil, data_dir_or_options} -> - try do - {:ok, {to_string(data_dir_or_options), [], []}} - rescue - ArgumentError -> - {:error, "Options must include :data_dir"} - - Protocol.UndefinedError -> - {:error, "data_dir must be a string (or implement String.Chars)"} - end + {:ok, {data_dir_or_options, [], []}} {data_dir, options} -> {gen_server_opts, opts} = Keyword.split(options, [:name, :timeout, :spawn_opt, :hibernate_after, :debug]) - try do - {:ok, {to_string(data_dir), opts, gen_server_opts}} - rescue - Protocol.UndefinedError -> - {:error, "data_dir must be a string (or implement String.Chars)"} - end + {:ok, {data_dir, opts, gen_server_opts}} end end end diff --git a/lib/cubdb/btree.ex b/lib/cubdb/btree.ex index 486dfc1..c697b55 100644 --- a/lib/cubdb/btree.ex +++ b/lib/cubdb/btree.ex @@ -28,7 +28,7 @@ defmodule CubDB.Btree do @type val :: CubDB.value() @type btree_size :: non_neg_integer @type dirt :: non_neg_integer - @type location :: non_neg_integer + @type location :: any @type capacity :: pos_integer @type child_pointer :: {key, location} @type leaf_node :: record(:leaf, children: [child_pointer]) diff --git a/lib/cubdb/store.ex b/lib/cubdb/store.ex index b530b10..d480eb2 100644 --- a/lib/cubdb/store.ex +++ b/lib/cubdb/store.ex @@ -7,6 +7,21 @@ defprotocol CubDB.Store do alias CubDB.Btree + @spec identifier(t) :: String.t() + def identifier(store) + + @spec next_compaction_store(t) :: t + def next_compaction_store(store) + + @spec start_cleanup(t) :: {:ok, pid} | {:error, String.t()} + def start_cleanup(t) + + @spec clean_up_old_compaction_files(t, pid) :: :ok | {:error, String.t()} + def clean_up_old_compaction_files(t, cleanup) + + @spec clean_up(t, pid, Btree.btree_node()) :: :ok | {:error, String.t()} + def clean_up(t, cleanup, btree) + @spec put_node(t, Btree.btree_node()) :: Btree.location() def put_node(store, node) diff --git a/lib/cubdb/store/file.ex b/lib/cubdb/store/file.ex index 4c832ee..062ff66 100644 --- a/lib/cubdb/store/file.ex +++ b/lib/cubdb/store/file.ex @@ -32,7 +32,7 @@ defmodule CubDB.Store.File do defp init(file_path) do ensure_exclusive_access!(file_path) - {:ok, file} = :file.open(file_path, [:read, :append, :raw, :binary]) + {:ok, file} = :file.open(file_path, [:read, :append, :raw, :binary, :delayed_write]) {:ok, pos} = :file.position(file, :eof) {file, pos} @@ -49,6 +49,45 @@ end defimpl CubDB.Store, for: CubDB.Store.File do alias CubDB.Store alias CubDB.Store.File.Blocks + alias CubDB.Store.File.CleanUp + + @compaction_file_extension ".compact" + + def identifier(%Store.File{file_path: file_path}) do + file_path + end + + def clean_up(_store, cpid, btree) do + CleanUp.clean_up(cpid, btree) + end + + def clean_up_old_compaction_files(store, pid) do + CleanUp.clean_up_old_compaction_files(pid, store) + end + + def start_cleanup(%Store.File{file_path: file_path}) do + data_dir = Path.dirname(file_path) + + CleanUp.start_link(data_dir) + end + + def next_compaction_store(%Store.File{file_path: file_path}) do + data_dir = Path.dirname(file_path) + + with {:ok, file_names} <- File.ls(data_dir) do + new_filename = + file_names + |> Enum.filter(&CubDB.cubdb_file?/1) + |> Enum.map(&CubDB.file_name_to_n/1) + |> Enum.sort() + |> List.last() + |> (&(&1 + 1)).() + |> Integer.to_string(16) + |> (&(&1 <> @compaction_file_extension)).() + + Store.File.create(Path.join(data_dir, new_filename)) + end + end def put_node(%Store.File{pid: pid}, node) do Agent.get_and_update( diff --git a/lib/cubdb/clean_up.ex b/lib/cubdb/store/file/clean_up.ex similarity index 98% rename from lib/cubdb/clean_up.ex rename to lib/cubdb/store/file/clean_up.ex index 4b11537..b652336 100644 --- a/lib/cubdb/clean_up.ex +++ b/lib/cubdb/store/file/clean_up.ex @@ -1,4 +1,4 @@ -defmodule CubDB.CleanUp do +defmodule CubDB.Store.File.CleanUp do @moduledoc false # The `CubDB.CleanUp` module takes care of cleaning up obsolete files, like diff --git a/lib/cubdb/store/test_store.ex b/lib/cubdb/store/test_store.ex index bd8e527..3c4258d 100644 --- a/lib/cubdb/store/test_store.ex +++ b/lib/cubdb/store/test_store.ex @@ -23,6 +23,26 @@ end defimpl CubDB.Store, for: CubDB.Store.TestStore do alias CubDB.Store.TestStore + def identifier(%TestStore{agent: pid}) do + "#{pid}" + end + + def clean_up(_store, cpid, btree) do + :ok + end + + def clean_up_old_compaction_files(store, pid) do + :ok + end + + def start_cleanup(%TestStore{}) do + {:ok, nil} + end + + def next_compaction_store(%TestStore{}) do + Store.TestStore.create() + end + def put_node(%TestStore{agent: agent}, node) do Agent.get_and_update( agent, diff --git a/test/cubdb/clean_up_test.exs b/test/cubdb/clean_up_test.exs index 4f7e153..94746cf 100644 --- a/test/cubdb/clean_up_test.exs +++ b/test/cubdb/clean_up_test.exs @@ -2,7 +2,7 @@ defmodule CubDB.Store.CleanUpTest do use ExUnit.Case, async: true alias CubDB.Btree - alias CubDB.CleanUp + alias CubDB.Store.File.CleanUp alias CubDB.Store setup do diff --git a/test/cubdb/store/file_test.exs b/test/cubdb/store/file_test.exs index 33c0dd5..3108ea3 100644 --- a/test/cubdb/store/file_test.exs +++ b/test/cubdb/store/file_test.exs @@ -30,6 +30,7 @@ defmodule CubDB.Store.FileTest do CubDB.Store.put_header(store, good_header) CubDB.Store.put_header(store, {0, 0, 0}) + CubDB.Store.sync(store) # corrupt the last header {:ok, file} = :file.open(store.file_path, [:read, :write, :raw, :binary]) @@ -46,6 +47,7 @@ defmodule CubDB.Store.FileTest do CubDB.Store.put_header(store, good_header) CubDB.Store.put_header(store, {0, 0, 0}) + CubDB.Store.sync(store) # truncate the last header {:ok, file} = :file.open(store.file_path, [:read, :write, :raw, :binary]) diff --git a/test/cubdb_test.exs b/test/cubdb_test.exs index 41ec198..cb2eceb 100644 --- a/test/cubdb_test.exs +++ b/test/cubdb_test.exs @@ -701,7 +701,7 @@ defmodule CubDBTest do test "data_dir/1 returns the path to the data directory", %{tmp_dir: tmp_dir} do {:ok, db} = CubDB.start_link(tmp_dir) tmp_dir_string = to_string(tmp_dir) - assert ^tmp_dir_string = CubDB.data_dir(db) + assert ^tmp_dir_string = to_string(CubDB.data_dir(db)) end test "current_db_file/1 returns the path to the current database file", %{tmp_dir: tmp_dir} do