Skip to content

Index definitions for type-search on all branches #86

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions sql/2025-05-29_scoped-definition-search.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
-- Table of all root branch hashes which need to be synced to the relevant definition search index(es).
CREATE TABLE scoped_definition_search_queue (
-- Nullable release ID, if set we'll also updat the global definition search index.
release_id UUID NULL REFERENCES project_releases(id) ON DELETE CASCADE,
root_namespace_hash_id INTEGER PRIMARY KEY REFERENCES branch_hashes(id) ON DELETE CASCADE,
-- A user who has this code. We don't index variable names, so it doesn't matter _which_ user.
codebase_user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX scoped_definition_search_queue_created_at ON scoped_definition_search_queue(created_at ASC);

-- Port the old queue
INSERT INTO scoped_definition_search_queue (root_namespace_hash_id, codebase_user_id, created_at)
SELECT c.namespace_hash_id AS root_namespace_hash_id,
p.owner_user_id AS codebase_user_id,
gdsrq.created_at AS created_at
FROM global_definition_search_release_queue gdsrq
JOIN project_releases release ON gdsrq.release_id = release.id
JOIN causals c ON release.squashed_causal_id = c.id
JOIN projects p ON release.project_id = p.id
ON CONFLICT DO NOTHING;
TRUNCATE global_definition_search_release_queue;


-- Expand the global definition search to include all branch and release heads.

-- The set of roots we've indexed.
CREATE TABLE indexed_definition_search_doc_roots (
root_namespace_hash_id INTEGER PRIMARY KEY REFERENCES branch_hashes(id) ON DELETE CASCADE
);

-- A version of the global_definition_search_docs table, but contains search docs scoped by the root namespace rather
-- than the project and release.
-- This allows us to do a definition search within any branch.
-- It can eventually be used to replace the global_definition_search_docs table,
-- but that can be done separately in the future.
CREATE TABLE scoped_definition_search_docs (
root_namespace_hash_id INTEGER NOT NULL REFERENCES indexed_definition_search_doc_roots(root_namespace_hash_id) ON DELETE CASCADE,
-- Fully qualified name of the definition
name TEXT NOT NULL,
search_tokens TSVECTOR NOT NULL,
-- Number of arguments. 0 for values.
arity INT NOT NULL,
tag definition_tag NOT NULL,

-- Contains the rendered type signature, type, hash, etc.
-- so we don't need to look up types for hundreds of search results on the fly.
metadata JSONB NOT NULL,

-- Ostensibly there's the possibility of name conflicts,
-- but those are rare enough we don't care, we just insert with ON CONFLICT DO NOTHING.
PRIMARY KEY (root_namespace_hash_id, name)
);

-- Port the old indexes to the new table.
INSERT INTO indexed_definition_search_doc_roots (root_namespace_hash_id)
SELECT DISTINCT c.namespace_hash_id AS root_namespace_hash_id
FROM global_definition_search_docs gds
JOIN project_releases release ON gds.release_id = release.id
JOIN causals c ON release.squashed_causal_id = c.id
ON CONFLICT DO NOTHING;

INSERT INTO scoped_definition_search_docs (root_namespace_hash_id, name, search_tokens, arity, tag, metadata)
SELECT
c.namespace_hash_id AS root_namespace_hash_id,
gds.name,
gds.search_tokens,
gds.arity,
gds.tag,
gds.metadata
FROM global_definition_search_docs gds
JOIN project_releases release ON gds.release_id = release.id
JOIN causals c ON release.squashed_causal_id = c.id
ON CONFLICT DO NOTHING;

-- Index for searching global definitions by 'search token', with an optional project/release filter.
-- P.s. there's a search token type for name, so we don't need to index that separately.
CREATE INDEX scoped_definition_search_docs_tokens ON scoped_definition_search_docs USING GIN(root_namespace_hash_id, search_tokens, tag);

-- Index for fuzzy-searching on the fully qualified name.
CREATE INDEX scoped_definition_search_docs_name_trigram ON scoped_definition_search_docs USING GIST (name gist_trgm_ops);

-- Insert into the queue to be synced using triggers on branches and releases
CREATE OR REPLACE FUNCTION scoped_definition_search_queue_on_branch_change_trigger()
RETURNS TRIGGER AS $$
BEGIN
-- Check if this is an INSERT or if the
IF TG_OP = 'INSERT' OR (TG_OP = 'UPDATE' AND OLD.causal_id IS DISTINCT FROM NEW.causal_id) THEN
INSERT INTO scoped_definition_search_queue (root_namespace_hash_id, codebase_user_id)
SELECT c.namespace_hash_id AS root_namespace_hash_id,
p.owner_user_id AS codebase_user_id
FROM causals c
JOIN projects p ON p.id = NEW.project_id
WHERE c.id = NEW.causal_id
ON CONFLICT DO NOTHING;
NOTIFY definition_sync;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER scoped_definition_search_queue_on_branch_change
AFTER INSERT OR UPDATE ON project_branches
FOR EACH ROW
EXECUTE FUNCTION scoped_definition_search_queue_on_branch_change_trigger();

CREATE OR REPLACE FUNCTION scoped_definition_search_queue_on_release_publish_trigger()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO scoped_definition_search_queue (root_namespace_hash_id, release_id, codebase_user_id)
SELECT c.namespace_hash_id AS root_namespace_hash_id,
NEW.id AS release_id,
p.owner_user_id AS codebase_user_id
FROM causals c
JOIN projects p ON p.id = NEW.project_id
WHERE c.id = NEW.squashed_causal_id
ON CONFLICT DO NOTHING;
-- Log message to console
RAISE NOTICE 'Added scoped definition search queue entry for release % in project %', NEW.id, NEW.project_id;
NOTIFY definition_sync;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER scoped_definition_search_queue_on_release_publish
AFTER INSERT ON project_releases
FOR EACH ROW
EXECUTE FUNCTION scoped_definition_search_queue_on_release_publish_trigger();

80 changes: 42 additions & 38 deletions src/Share/BackgroundJobs/Search/DefinitionSync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import Share.BackgroundJobs.Search.DefinitionSync.Types (Arity (..), DefinitionD
import Share.BackgroundJobs.Workers (newWorker)
import Share.Codebase (CodebaseM)
import Share.Codebase qualified as Codebase
import Share.IDs (ProjectId, ReleaseId)
import Share.IDs (ReleaseId, UserId)
import Share.Metrics qualified as Metrics
import Share.Postgres qualified as PG
import Share.Postgres.Cursors qualified as Cursors
import Share.Postgres.Hashes.Queries qualified as HashQ
import Share.Postgres.IDs (BranchHashId)
import Share.Postgres.NameLookups.Ops qualified as NLOps
import Share.Postgres.NameLookups.Types qualified as NL
Expand All @@ -37,7 +36,6 @@ import Share.Postgres.Releases.Queries qualified as RQ
import Share.Postgres.Search.DefinitionSearch.Queries qualified as DDQ
import Share.Prelude
import Share.PrettyPrintEnvDecl.Postgres qualified as PPEPostgres
import Share.Project (Project (..))
import Share.Release (Release (..))
import Share.Utils.Logging qualified as Logging
import Share.Web.Authorization qualified as AuthZ
Expand Down Expand Up @@ -87,10 +85,10 @@ worker scope = do
processReleases authZReceipt
where
processReleases authZReceipt = do
(mayErrs, mayProcessedRelease) <- Metrics.recordDefinitionSearchIndexDuration $ PG.runTransaction $ do
mayReleaseId <- DDQ.claimUnsyncedRelease
mayErrs <- for mayReleaseId (syncRelease authZReceipt)
pure (mayErrs, mayReleaseId)
(mayErrs, mayProcessedRelease) <- Metrics.recordDefinitionSearchIndexDuration $ PG.runTransactionMode PG.RepeatableRead PG.ReadWrite $ do
mayUnsynced <- DDQ.claimUnsynced
mayErrs <- for mayUnsynced (syncRoot authZReceipt)
pure (mayErrs, mayUnsynced)
case mayErrs of
Just errs@(_ : _rrs) -> Logging.logErrorText $ "Definition sync errors: " <> Text.intercalate "," (tShow <$> errs)
_ -> pure ()
Expand All @@ -104,41 +102,50 @@ worker scope = do
processReleases authZReceipt
Nothing -> pure ()

syncRelease ::
syncRoot ::
AuthZ.AuthZReceipt ->
ReleaseId ->
(Maybe ReleaseId, BranchHashId, UserId) ->
PG.Transaction e [DefnIndexingFailure]
syncRelease authZReceipt releaseId = fmap (fromMaybe []) . runMaybeT $ do
Release {projectId, releaseId, squashedCausal, version} <- lift $ PG.expectReleaseById releaseId
syncRoot authZReceipt (mayReleaseId, rootBranchHashId, codebaseOwner) = do
-- Only index it if it's not already indexed.
errs <-
(DDQ.isRootIndexed rootBranchHashId) >>= \case
False -> do
DDQ.markRootAsIndexed rootBranchHashId
namesPerspective <- NLOps.namesPerspectiveForRootAndPath rootBranchHashId (NL.PathSegments [])
let nlReceipt = NL.nameLookupReceipt namesPerspective
let codebaseLoc = Codebase.codebaseLocationForProjectRelease codebaseOwner
let codebase = Codebase.codebaseEnv authZReceipt codebaseLoc
Codebase.codebaseMToTransaction codebase $ do
termsCursor <- lift $ NLOps.projectTermsWithinRoot nlReceipt rootBranchHashId

termErrs <- syncTerms namesPerspective rootBranchHashId termsCursor
typesCursor <- lift $ NLOps.projectTypesWithinRoot nlReceipt rootBranchHashId
typeErrs <- syncTypes namesPerspective rootBranchHashId typesCursor
pure (termErrs <> typeErrs)
True -> pure mempty
-- Copy relevant index rows into the global search index as well
for mayReleaseId (syncRelease rootBranchHashId)
pure errs

syncRelease :: BranchHashId -> ReleaseId -> PG.Transaction e ()
syncRelease rootBranchHashId releaseId = fmap (fromMaybe ()) . runMaybeT $ do
Release {projectId, releaseId, version} <- lift $ PG.expectReleaseById releaseId
-- Wipe out any existing rows for this release. Normally there should be none, but this
-- makes it easy to re-index later if we change how we tokenize things.
latestVersion <- MaybeT $ RQ.latestReleaseVersionByProjectId projectId
-- Only index the latest version of a release.
guard $ version == latestVersion
lift $ DDQ.cleanIndexForProject projectId
Project {ownerUserId} <- lift $ PG.expectProjectById projectId
lift $ do
bhId <- HashQ.expectNamespaceIdsByCausalIdsOf id squashedCausal
namesPerspective <- NLOps.namesPerspectiveForRootAndPath bhId (NL.PathSegments [])
let nlReceipt = NL.nameLookupReceipt namesPerspective
let codebaseLoc = Codebase.codebaseLocationForProjectRelease ownerUserId
let codebase = Codebase.codebaseEnv authZReceipt codebaseLoc
Codebase.codebaseMToTransaction codebase $ do
termsCursor <- lift $ NLOps.projectTermsWithinRoot nlReceipt bhId

termErrs <- syncTerms namesPerspective bhId projectId releaseId termsCursor
typesCursor <- lift $ NLOps.projectTypesWithinRoot nlReceipt bhId
typeErrs <- syncTypes namesPerspective projectId releaseId typesCursor
pure (termErrs <> typeErrs)
-- Copy the indexed documents from the scoped index into the global index.
lift $ DDQ.copySearchDocumentsForRelease rootBranchHashId projectId releaseId

syncTerms ::
NL.NamesPerspective ->
BranchHashId ->
ProjectId ->
ReleaseId ->
Cursors.PGCursor (Name, Referent) ->
CodebaseM e [DefnIndexingFailure]
syncTerms namesPerspective bhId projectId releaseId termsCursor = do
syncTerms namesPerspective rootBranchHashId termsCursor = do
Cursors.foldBatched termsCursor defnBatchSize \terms -> do
(errs, refDocs) <-
PG.timeTransaction "Building terms" $
Expand All @@ -157,13 +164,12 @@ syncTerms namesPerspective bhId projectId releaseId termsCursor = do
& \case
(ns :| rest) -> ns :| take 1 rest
& Name.fromReverseSegments
termSummary <- lift $ Summary.termSummaryForReferent ref typ (Just displayName) bhId Nothing Nothing
termSummary <- lift $ Summary.termSummaryForReferent ref typ (Just displayName) rootBranchHashId Nothing Nothing
let sh = Referent.toShortHash ref
let (refTokens, arity) = tokensForTerm fqn ref typ termSummary
let dd =
DefinitionDocument
{ project = projectId,
release = releaseId,
{ rootBranchHashId,
fqn,
hash = sh,
tokens = refTokens,
Expand All @@ -177,7 +183,7 @@ syncTerms namesPerspective bhId projectId releaseId termsCursor = do
let allDeps = setOf (folded . folding tokens . folded . to LD.TypeReference) refDocs
pped <- PG.timeTransaction "Build PPED" $ PPEPostgres.ppedForReferences namesPerspective allDeps
let ppe = PPED.unsuffixifiedPPE pped
let namedDocs :: [DefinitionDocument ProjectId ReleaseId Name (Name, ShortHash)]
let namedDocs :: [DefinitionDocument Name (Name, ShortHash)]
namedDocs =
refDocs
& traversed . field @"tokens" %~ Set.mapMaybe \token -> do
Expand Down Expand Up @@ -311,11 +317,10 @@ typeSigTokens typ =

syncTypes ::
NL.NamesPerspective ->
ProjectId ->
ReleaseId ->
BranchHashId ->
Cursors.PGCursor (Name, TypeReference) ->
CodebaseM e [DefnIndexingFailure]
syncTypes namesPerspective projectId releaseId typesCursor = do
syncTypes namesPerspective rootBranchHashId typesCursor = do
Cursors.foldBatched typesCursor defnBatchSize \types -> do
(errs, refDocs) <-
types
Expand All @@ -335,8 +340,7 @@ syncTypes namesPerspective projectId releaseId typesCursor = do
let sh = Reference.toShortHash ref
let dd =
DefinitionDocument
{ project = projectId,
release = releaseId,
{ rootBranchHashId,
fqn,
hash = sh,
tokens = declTokens <> basicTokens,
Expand All @@ -349,7 +353,7 @@ syncTypes namesPerspective projectId releaseId typesCursor = do
let allDeps = setOf (folded . folding tokens . folded . to LD.TypeReference) refDocs
pped <- PPEPostgres.ppedForReferences namesPerspective allDeps
let ppe = PPED.unsuffixifiedPPE pped
let namedDocs :: [DefinitionDocument ProjectId ReleaseId Name (Name, ShortHash)]
let namedDocs :: [DefinitionDocument Name (Name, ShortHash)]
namedDocs =
refDocs
& traversed . field @"tokens" %~ Set.mapMaybe \token -> do
Expand Down
6 changes: 3 additions & 3 deletions src/Share/BackgroundJobs/Search/DefinitionSync/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import Hasql.Encoders qualified as Encoders
import Hasql.Interpolate qualified as Hasql
import Servant (FromHttpApiData)
import Servant.API (FromHttpApiData (..), ToHttpApiData (..))
import Share.Postgres.IDs (BranchHashId)
import Share.Prelude
import Unison.DataDeclaration qualified as DD
import Unison.Name (Name)
Expand Down Expand Up @@ -153,9 +154,8 @@ data DefnSearchToken typeRef
| TypeModToken DD.Modifier
deriving stock (Show, Eq, Ord, Functor, Foldable, Traversable)

data DefinitionDocument proj release name typeRef = DefinitionDocument
{ project :: proj,
release :: release,
data DefinitionDocument name typeRef = DefinitionDocument
{ rootBranchHashId :: BranchHashId,
fqn :: Name,
hash :: ShortHash,
-- For now we only index types by their final name segment, may need to revisit this
Expand Down
4 changes: 1 addition & 3 deletions src/Share/Postgres/Queries.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import Share.Postgres (unrecoverableError)
import Share.Postgres qualified as PG
import Share.Postgres.IDs
import Share.Postgres.NameLookups.Types (NameLookupReceipt)
import Share.Postgres.Search.DefinitionSearch.Queries qualified as DDQ
import Share.Postgres.Users.Queries qualified as UserQ
import Share.Prelude
import Share.Project
Expand Down Expand Up @@ -640,7 +639,7 @@ createRelease ::
UserId ->
m (Release CausalId UserId)
createRelease !_nlReceipt projectId ReleaseVersion {major, minor, patch} squashedCausalId unsquashedCausalId creatorId = do
release@Release {releaseId} <-
release <-
PG.queryExpect1Row
[PG.sql|
INSERT INTO project_releases(
Expand Down Expand Up @@ -669,7 +668,6 @@ createRelease !_nlReceipt projectId ReleaseVersion {major, minor, patch} squashe
minor_version,
patch_version
|]
DDQ.submitReleaseToBeSynced releaseId
pure release

setBranchCausalHash ::
Expand Down
Loading
Loading