From 1c24b72483d049687d0c8f429945954340c1a8e0 Mon Sep 17 00:00:00 2001 From: MikaAK Date: Mon, 27 Apr 2026 11:21:13 -0700 Subject: [PATCH] fix: wait for ETS/DETS/Counter tables to be ready before start_link returns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cache.ETS, Cache.DETS, and Cache.Counter all used Task.start_link to spawn a hibernating owner process, but the table setup ran inside the task. start_link returned as soon as the task was spawned, so the supervisor considered the child started before the table actually existed — callers that touched the cache immediately after the supervisor came up could hit a missing table. Each adapter's task now sends a :ready signal back to the parent after setup, and start_link blocks on that signal (or on {:EXIT, pid, reason} if setup crashes). The supervisor sees a genuinely synchronous start. Cache.PersistentTerm has no setup and Cache.RefreshAhead.start_tracker already creates its table before Task.start_link, so neither needs the handshake. --- lib/cache/counter.ex | 28 ++++++++++++++++-------- lib/cache/dets.ex | 48 ++++++++++++++++++++++++---------------- lib/cache/ets.ex | 52 ++++++++++++++++++++++++++------------------ 3 files changed, 79 insertions(+), 49 deletions(-) diff --git a/lib/cache/counter.ex b/lib/cache/counter.ex index 3aa10f5..c62fb43 100644 --- a/lib/cache/counter.ex +++ b/lib/cache/counter.ex @@ -97,15 +97,25 @@ defmodule Cache.Counter do @impl Cache def start_link(opts) do - Task.start_link(fn -> - cache_name = opts[:table_name] - initial_size = opts[:initial_size] || 1 - counters_opts = if opts[:write_concurrency], do: [:atomics, :write_concurrency], else: [:atomics] - ref = :counters.new(initial_size, counters_opts) - :persistent_term.put({cache_name, @ref_key}, ref) - :persistent_term.put({cache_name, @size_key}, initial_size) - Process.hibernate(Function, :identity, [nil]) - end) + parent = self() + ready_ref = make_ref() + + {:ok, pid} = + Task.start_link(fn -> + cache_name = opts[:table_name] + initial_size = opts[:initial_size] || 1 + counters_opts = if opts[:write_concurrency], do: [:atomics, :write_concurrency], else: [:atomics] + ref = :counters.new(initial_size, counters_opts) + :persistent_term.put({cache_name, @ref_key}, ref) + :persistent_term.put({cache_name, @size_key}, initial_size) + send(parent, {ready_ref, :ready}) + Process.hibernate(Function, :identity, [nil]) + end) + + receive do + {^ready_ref, :ready} -> {:ok, pid} + {:EXIT, ^pid, reason} -> {:error, reason} + end end @impl Cache diff --git a/lib/cache/dets.ex b/lib/cache/dets.ex index 40ac30a..3df2033 100644 --- a/lib/cache/dets.ex +++ b/lib/cache/dets.ex @@ -365,25 +365,35 @@ defmodule Cache.DETS do @impl Cache def start_link(opts) do - Task.start_link(fn -> - table_name = opts[:table_name] - - file_path = - opts[:file_path] - |> to_string - |> create_file_name(table_name) - |> tap(&File.mkdir_p!(Path.dirname(&1))) - |> String.to_charlist() - - opts = - opts - |> Keyword.drop([:table_name, :file_path]) - |> Kernel.++(access: :read_write, file: file_path) - - {:ok, _} = :dets.open_file(table_name, opts) - - Process.hibernate(Function, :identity, [nil]) - end) + parent = self() + ref = make_ref() + + {:ok, pid} = + Task.start_link(fn -> + table_name = opts[:table_name] + + file_path = + opts[:file_path] + |> to_string + |> create_file_name(table_name) + |> tap(&File.mkdir_p!(Path.dirname(&1))) + |> String.to_charlist() + + opts = + opts + |> Keyword.drop([:table_name, :file_path]) + |> Kernel.++(access: :read_write, file: file_path) + + {:ok, _} = :dets.open_file(table_name, opts) + + send(parent, {ref, :ready}) + Process.hibernate(Function, :identity, [nil]) + end) + + receive do + {^ref, :ready} -> {:ok, pid} + {:EXIT, ^pid, reason} -> {:error, reason} + end end defp create_file_name(file_path, table_name) do diff --git a/lib/cache/ets.ex b/lib/cache/ets.ex index 71d607c..f23169b 100644 --- a/lib/cache/ets.ex +++ b/lib/cache/ets.ex @@ -617,30 +617,40 @@ defmodule Cache.ETS do @impl Cache def start_link(opts) do - Task.start_link(fn -> - table_name = opts[:table_name] - rehydration_path = opts[:rehydration_path] - - ets_opts = - opts - |> Keyword.drop([:table_name, :type, :rehydration_path]) - |> Kernel.++([opts[:type], :public, :named_table]) - - ets_opts = - if opts[:compressed] do - Keyword.delete(ets_opts, :compressed) ++ [:compressed] - else - ets_opts + parent = self() + ref = make_ref() + + {:ok, pid} = + Task.start_link(fn -> + table_name = opts[:table_name] + rehydration_path = opts[:rehydration_path] + + ets_opts = + opts + |> Keyword.drop([:table_name, :type, :rehydration_path]) + |> Kernel.++([opts[:type], :public, :named_table]) + + ets_opts = + if opts[:compressed] do + Keyword.delete(ets_opts, :compressed) ++ [:compressed] + else + ets_opts + end + + rehydrate_or_create_table(table_name, rehydration_path, ets_opts) + + if rehydration_path do + setup_exit_signal_handlers(table_name, rehydration_path) end - rehydrate_or_create_table(table_name, rehydration_path, ets_opts) + send(parent, {ref, :ready}) + Process.hibernate(Function, :identity, [nil]) + end) - if rehydration_path do - setup_exit_signal_handlers(table_name, rehydration_path) - end - - Process.hibernate(Function, :identity, [nil]) - end) + receive do + {^ref, :ready} -> {:ok, pid} + {:EXIT, ^pid, reason} -> {:error, reason} + end end defp rehydrate_or_create_table(table_name, rehydration_path, ets_opts)