Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions lib/kith/contacts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1740,14 +1740,28 @@ defmodule Kith.Contacts do
end,
set: [contact_id: survivor.id]
)
# Remap photos
|> Ecto.Multi.update_all(
:remap_photos,
fn _changes ->
# Remap photos (delete duplicates by content_hash first, then move remaining)
|> Ecto.Multi.run(:remap_photos, fn repo, _changes ->
# Delete photos from non-survivor that already exist on survivor (same content_hash)
repo.query(
"""
DELETE FROM photos
WHERE contact_id = $1
AND content_hash IS NOT NULL
AND content_hash IN (
SELECT content_hash FROM photos WHERE contact_id = $2 AND content_hash IS NOT NULL
)
""",
[non_survivor.id, survivor.id]
)

# Move remaining photos
{count, _} =
from(p in Photo, where: p.contact_id == ^non_survivor.id)
end,
set: [contact_id: survivor.id]
)
|> repo.update_all(set: [contact_id: survivor.id])

{:ok, count}
end)
# Remap addresses
|> Ecto.Multi.update_all(
:remap_addresses,
Expand Down
6 changes: 6 additions & 0 deletions lib/kith/duplicate_detection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@ defmodule Kith.DuplicateDetection do
alias Kith.Contacts.DuplicateCandidate
alias Kith.Repo

@default_page_size 20

def list_candidates(account_id, opts \\ []) do
status = Keyword.get(opts, :status, "pending")
limit = Keyword.get(opts, :limit, @default_page_size)
offset = Keyword.get(opts, :offset, 0)

DuplicateCandidate
|> scope_to_account(account_id)
|> where([d], d.status == ^status)
|> order_by([d], desc: d.score)
|> limit(^limit)
|> offset(^offset)
|> Repo.all()
|> Repo.preload([:contact, :duplicate_contact])
end
Expand Down
133 changes: 91 additions & 42 deletions lib/kith/workers/duplicate_detection_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,26 @@ defmodule Kith.Workers.DuplicateDetectionWorker do

Detection algorithm:
1. Name similarity via pg_trgm similarity() on display_name (threshold: 0.5)
2. Exact email match across contact_fields
3. Exact phone match across contact_fields
4. Weighted score: name(0.4) + email(0.35) + phone(0.25)
2. Case-insensitive email match across contact_fields
3. Normalized phone match across contact_fields (digits only)
4. Address match on line1 + postal_code

Scoring (max-signal + bonus):
Each signal has an independent base score:
- email_match: 0.85
- phone_match: 0.75
- address_match: 0.60
- name_match: the raw pg_trgm similarity (> 0.5)
Final score = max(base scores) + 0.05 per additional signal, capped at 1.0
Threshold: >= 0.5
"""

use Oban.Worker,
queue: :default,
max_attempts: 3

import Ecto.Query
alias Kith.Contacts.{Contact, ContactField, DuplicateCandidate}
alias Kith.Contacts.{Address, Contact, ContactField, ContactFieldType, DuplicateCandidate}
alias Kith.Repo

@impl Oban.Worker
Expand All @@ -36,33 +45,25 @@ defmodule Kith.Workers.DuplicateDetectionWorker do
end

defp detect_duplicates(account_id) do
# Get active contacts for this account
contacts =
contact_count =
Contact
|> where([c], c.account_id == ^account_id)
|> where([c], is_nil(c.deleted_at))
|> select([c], %{id: c.id, display_name: c.display_name})
|> Repo.all()
|> Repo.aggregate(:count)

if length(contacts) < 2, do: :ok, else: find_duplicates(account_id, contacts)
if contact_count >= 2, do: find_duplicates(account_id)
end

defp find_duplicates(account_id, _contacts) do
# Find name-based duplicates using pg_trgm
defp find_duplicates(account_id) do
name_matches = find_name_matches(account_id)

# Find email-based duplicates
email_matches = find_email_matches(account_id)

# Find phone-based duplicates
phone_matches = find_phone_matches(account_id)
address_matches = find_address_matches(account_id)

# Merge and score all matches
all_pairs =
merge_matches(name_matches, email_matches, phone_matches)
|> Enum.filter(fn {_pair, score, _reasons} -> score >= 0.4 end)
merge_matches(name_matches, email_matches, phone_matches, address_matches)
|> Enum.filter(fn {_pair, score, _reasons} -> score >= 0.5 end)

# Get existing pending/dismissed candidates to avoid re-inserting
existing =
DuplicateCandidate
|> where([d], d.account_id == ^account_id)
Expand All @@ -73,9 +74,7 @@ defmodule Kith.Workers.DuplicateDetectionWorker do

now = DateTime.utc_now() |> DateTime.truncate(:second)

# Insert new candidates
Enum.each(all_pairs, fn {{id1, id2}, score, reasons} ->
# Canonicalize: smaller id first
{contact_id, dup_id} = if id1 < id2, do: {id1, id2}, else: {id2, id1}

unless MapSet.member?(existing, {contact_id, dup_id}) do
Expand All @@ -93,7 +92,6 @@ defmodule Kith.Workers.DuplicateDetectionWorker do
end

defp find_name_matches(account_id) do
# Use pg_trgm similarity for fuzzy name matching
query = """
SELECT c1.id AS id1, c2.id AS id2, similarity(c1.display_name, c2.display_name) AS sim
FROM contacts c1
Expand All @@ -102,6 +100,8 @@ defmodule Kith.Workers.DuplicateDetectionWorker do
WHERE c1.account_id = $1
AND c1.deleted_at IS NULL
AND c2.deleted_at IS NULL
AND c1.display_name IS NOT NULL AND c1.display_name != ''
AND c2.display_name IS NOT NULL AND c2.display_name != ''
AND similarity(c1.display_name, c2.display_name) > 0.5
ORDER BY sim DESC
LIMIT 500
Expand All @@ -119,55 +119,94 @@ defmodule Kith.Workers.DuplicateDetectionWorker do
end

defp find_email_matches(account_id) do
# Find contacts that share an exact email address
# Case-insensitive email match, both fields verified as email type
query =
from cf1 in ContactField,
join: cf2 in ContactField,
on: cf1.value == cf2.value and cf1.id < cf2.id,
join: cft in assoc(cf1, :contact_field_type),
on:
fragment("LOWER(?)", cf1.value) == fragment("LOWER(?)", cf2.value) and
cf1.id < cf2.id,
join: cft1 in ContactFieldType,
on: cf1.contact_field_type_id == cft1.id,
join: cft2 in ContactFieldType,
on: cf2.contact_field_type_id == cft2.id,
where: cf1.account_id == ^account_id,
where: cf2.account_id == ^account_id,
where: cft.protocol == "mailto:",
where: fragment("? LIKE 'mailto%'", cft1.protocol),
where: fragment("? LIKE 'mailto%'", cft2.protocol),
where: cf1.contact_id != cf2.contact_id,
where: cf1.value != "" and not is_nil(cf1.value),
select: {cf1.contact_id, cf2.contact_id}

query
|> Repo.all()
|> Enum.uniq()
|> Enum.map(fn {id1, id2} ->
{id1, id2} = if id1 < id2, do: {id1, id2}, else: {id2, id1}
{{id1, id2}, 1.0, ["email_match"]}
if id1 < id2, do: {id1, id2}, else: {id2, id1}
end)
|> Enum.uniq_by(fn {pair, _, _} -> pair end)
|> Enum.uniq()
|> Enum.map(fn {id1, id2} -> {{id1, id2}, 1.0, ["email_match"]} end)
end

defp find_phone_matches(account_id) do
# Find contacts that share an exact phone number (normalized: digits only)
# Normalized phone match (digits only), both fields verified as phone type
query =
from cf1 in ContactField,
join: cf2 in ContactField,
on:
fragment("regexp_replace(?, '[^0-9]', '', 'g')", cf1.value) ==
fragment("regexp_replace(?, '[^0-9]', '', 'g')", cf2.value) and cf1.id < cf2.id,
join: cft in assoc(cf1, :contact_field_type),
fragment("regexp_replace(?, '[^0-9]', '', 'g')", cf2.value) and
cf1.id < cf2.id,
join: cft1 in ContactFieldType,
on: cf1.contact_field_type_id == cft1.id,
join: cft2 in ContactFieldType,
on: cf2.contact_field_type_id == cft2.id,
where: cf1.account_id == ^account_id,
where: cf2.account_id == ^account_id,
where: cft.protocol == "tel:",
where: fragment("? LIKE 'tel%'", cft1.protocol),
where: fragment("? LIKE 'tel%'", cft2.protocol),
where: cf1.contact_id != cf2.contact_id,
where: cf1.value != "" and not is_nil(cf1.value),
select: {cf1.contact_id, cf2.contact_id}

query
|> Repo.all()
|> Enum.map(fn {id1, id2} ->
if id1 < id2, do: {id1, id2}, else: {id2, id1}
end)
|> Enum.uniq()
|> Enum.map(fn {id1, id2} -> {{id1, id2}, 1.0, ["phone_match"]} end)
end

defp find_address_matches(account_id) do
# Match on normalized line1 + postal_code
query =
from a1 in Address,
join: a2 in Address,
on:
fragment("LOWER(TRIM(?))", a1.line1) == fragment("LOWER(TRIM(?))", a2.line1) and
fragment("LOWER(TRIM(?))", a1.postal_code) ==
fragment("LOWER(TRIM(?))", a2.postal_code) and
a1.id < a2.id,
where: a1.account_id == ^account_id,
where: a2.account_id == ^account_id,
where: a1.contact_id != a2.contact_id,
where: a1.line1 != "" and not is_nil(a1.line1),
where: a1.postal_code != "" and not is_nil(a1.postal_code),
where: a2.line1 != "" and not is_nil(a2.line1),
where: a2.postal_code != "" and not is_nil(a2.postal_code),
select: {a1.contact_id, a2.contact_id}

query
|> Repo.all()
|> Enum.map(fn {id1, id2} ->
{id1, id2} = if id1 < id2, do: {id1, id2}, else: {id2, id1}
{{id1, id2}, 1.0, ["phone_match"]}
if id1 < id2, do: {id1, id2}, else: {id2, id1}
end)
|> Enum.uniq_by(fn {pair, _, _} -> pair end)
|> Enum.uniq()
|> Enum.map(fn {id1, id2} -> {{id1, id2}, 1.0, ["address_match"]} end)
end

defp merge_matches(name_matches, email_matches, phone_matches) do
(name_matches ++ email_matches ++ phone_matches)
defp merge_matches(name_matches, email_matches, phone_matches, address_matches) do
(name_matches ++ email_matches ++ phone_matches ++ address_matches)
|> Enum.group_by(fn {pair, _score, _reasons} -> pair end)
|> Enum.map(&compute_merged_score/1)
end
Expand All @@ -176,9 +215,19 @@ defmodule Kith.Workers.DuplicateDetectionWorker do
reasons = matches |> Enum.flat_map(fn {_, _, r} -> r end) |> Enum.uniq()
name_sim = Enum.find_value(matches, 0.0, &extract_name_score/1)

email_weight = if "email_match" in reasons, do: 0.35, else: 0.0
phone_weight = if "phone_match" in reasons, do: 0.25, else: 0.0
score = min(name_sim * 0.4 + email_weight + phone_weight, 1.0)
# Base score for each signal type
base_scores =
[]
|> then(fn acc -> if "email_match" in reasons, do: [0.85 | acc], else: acc end)
|> then(fn acc -> if "phone_match" in reasons, do: [0.75 | acc], else: acc end)
|> then(fn acc -> if "address_match" in reasons, do: [0.60 | acc], else: acc end)
|> then(fn acc -> if name_sim > 0.0, do: [name_sim | acc], else: acc end)

signal_count = length(base_scores)
max_score = Enum.max(base_scores, fn -> 0.0 end)
bonus = max(signal_count - 1, 0) * 0.05

score = min(max_score + bonus, 1.0)

{pair, Float.round(score, 2), reasons}
end
Expand Down
4 changes: 4 additions & 0 deletions lib/kith/workers/import_source_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Kith.Workers.ImportSourceWorker do

alias Kith.Imports
alias Kith.Storage
alias Kith.Workers.DuplicateDetectionWorker

@impl Oban.Worker
def perform(%Oban.Job{args: %{"import_id" => import_id}}) do
Expand All @@ -33,6 +34,9 @@ defmodule Kith.Workers.ImportSourceWorker do
topic = "import:#{import.account_id}"
Phoenix.PubSub.broadcast(Kith.PubSub, topic, {:import_complete, summary_map})

# Trigger duplicate detection for newly imported contacts
Oban.insert(DuplicateDetectionWorker.new(%{account_id: import.account_id}))

Logger.info("Import #{import_id} completed: #{inspect(summary_map)}")
:ok
else
Expand Down
4 changes: 4 additions & 0 deletions lib/kith/workers/import_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Kith.Workers.ImportWorker do

alias Kith.Contacts
alias Kith.VCard.Parser
alias Kith.Workers.DuplicateDetectionWorker

@impl Oban.Worker
def perform(%Oban.Job{
Expand Down Expand Up @@ -42,6 +43,9 @@ defmodule Kith.Workers.ImportWorker do
{:import_complete, results}
)

# Trigger duplicate detection for newly imported contacts
Oban.insert(DuplicateDetectionWorker.new(%{account_id: account_id}))

Logger.info(
"vCard import complete for account #{account_id}: " <>
"#{results.imported} imported, #{results.skipped} skipped"
Expand Down
4 changes: 4 additions & 0 deletions lib/kith/workers/monica_api_crawl_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule Kith.Workers.MonicaApiCrawlWorker do

alias Kith.Imports
alias Kith.Imports.Sources.MonicaApi
alias Kith.Workers.DuplicateDetectionWorker

@impl Oban.Worker
def perform(%Oban.Job{args: %{"import_id" => import_id}}) do
Expand Down Expand Up @@ -47,6 +48,9 @@ defmodule Kith.Workers.MonicaApiCrawlWorker do
topic = "import:#{import_job.account_id}"
Phoenix.PubSub.broadcast(Kith.PubSub, topic, {:import_complete, summary_map})

# Trigger duplicate detection for newly imported contacts
Oban.insert(DuplicateDetectionWorker.new(%{account_id: import_job.account_id}))

Logger.info("MonicaApi import #{import_id} completed: #{inspect(summary_map)}")
:ok
else
Expand Down
Loading
Loading