Skip to content

Commit 25c2c7f

Browse files
authored
i3 worker integration (#568)
1 parent ef234dc commit 25c2c7f

File tree

13 files changed

+120
-103
lines changed

13 files changed

+120
-103
lines changed

docker/standard/entrypoint.sh

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,18 @@ exec_createsuperuser() {
2323
}
2424

2525
exec_index_schema_apply() {
26-
VIRTUAL_ENV=/core_app/.venv && cd /core_app && poetry run ./manage.py index_schema apply
26+
echo "RUNNING: exec_index_schema_apply"
27+
if [[ -z "${PAPERMERGE__SEARCH__URL}" ]]; then
28+
echo "env var PAPERMERGE__SEARCH__URL is NON-EMPTY... running..."
29+
cd /core_app && poetry run paper-cli index-schema apply
30+
fi
2731
}
2832

2933
exec_init() {
3034
exec_migrate
3135
exec_perms_sync
3236
exec_createsuperuser
37+
exec_index_schema_apply
3338
}
3439

3540
rm -f /etc/nginx/nginx.conf

papermerge/cli.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from papermerge.core.cli import token as token_cli
88
from papermerge.search.cli import search
99
from papermerge.search.cli import index
10+
from papermerge.search.cli import index_schema
1011

1112
app = typer.Typer(help="Papermerge DMS command line management tool")
1213
app.add_typer(usr_cli.app, name="users")
@@ -16,6 +17,7 @@
1617
app.add_typer(token_cli.app, name="tokens")
1718
app.add_typer(search.app, name="search")
1819
app.add_typer(index.app, name="index")
20+
app.add_typer(index_schema.app, name="index-schema")
1921

2022
if __name__ == "__main__":
2123
app()

papermerge/core/dbapi.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
get_docs_count_by_type,
99
update_doc_type,
1010
update_doc_cfv,
11-
get_doc_cfv
11+
get_doc_cfv,
12+
get_doc_ver_pages
1213
)
14+
from .features.nodes.db.api import get_nodes
1315
from .features.document_types.db.api import (
1416
create_document_type,
1517
get_document_types,
@@ -21,8 +23,10 @@
2123
from .features.custom_fields.db.api import create_custom_field
2224

2325
__all__ = [
26+
"get_nodes",
2427
"move_pages",
2528
"get_last_doc_ver",
29+
"get_doc_ver_pages",
2630
"get_doc_ver",
2731
"get_doc",
2832
"get_doc_cfv",

papermerge/core/features/document/schema.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ class Document(BaseModel):
203203
ocr: bool = True # will this document be OCRed?
204204
ocr_status: OCRStatusEnum = OCRStatusEnum.unknown
205205
thumbnail_url: ThumbnailUrl = None
206+
user_id: UUID
206207

207208
@field_validator("thumbnail_url", mode="before")
208209
def thumbnail_url_validator(cls, value, info):

papermerge/core/features/nodes/db/api.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def str2colexpr(keys: list[str]):
4545

4646

4747
def get_nodes(
48-
db_session: Session, user_id: UUID, node_ids: list[UUID] | None = None
48+
db_session: Session, user_id: UUID | None = None, node_ids: list[UUID] | None = None
4949
) -> list[schema.Document | schema.Folder]:
5050
items = []
5151
if node_ids is None:
@@ -55,18 +55,19 @@ def get_nodes(
5555
stmt = (
5656
select(orm.Node)
5757
.options(selectinload(orm.Node.tags))
58-
.filter(orm.Node.id.in_(node_ids), orm.Node.user_id == user_id)
58+
.filter(orm.Node.id.in_(node_ids))
5959
)
6060
else:
61-
stmt = (
62-
select(orm.Node)
63-
.options(selectinload(orm.Node.tags))
64-
.filter(orm.Node.user_id == user_id)
65-
)
61+
stmt = select(orm.Node).options(selectinload(orm.Node.tags))
62+
63+
if user_id is not None:
64+
stmt = stmt.filter(orm.Node.user_id == user_id)
6665

6766
nodes = db_session.scalars(stmt).all()
6867

6968
for node in nodes:
69+
breadcrumb = get_ancestors(db_session, node.id, include_self=False)
70+
node.breadcrumb = breadcrumb
7071
if node.ctype == "folder":
7172
items.append(schema.Folder.model_validate(node))
7273
else:

papermerge/core/features/nodes/router.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,16 @@
77
from fastapi import APIRouter, Depends, HTTPException, Query, Security
88
from sqlalchemy.exc import NoResultFound, IntegrityError
99

10-
from papermerge.celery_app import app as celery_app
10+
from papermerge.core.constants import INDEX_REMOVE_NODE
11+
from papermerge.core.tasks import send_task
1112
from papermerge.core import utils, schema, config
12-
from papermerge.core.features.auth import get_current_user
13-
from papermerge.core.features.auth import scopes
13+
from papermerge.core.features.auth import scopes, get_current_user
1414
from papermerge.core.constants import INDEX_ADD_NODE
1515
from papermerge.core.db.engine import Session
1616
from papermerge.core.features.document.db import api as doc_dbapi
1717
from papermerge.core.features.nodes.db import api as nodes_dbapi
1818
from papermerge.core.routers.common import OPEN_API_GENERIC_JSON_DETAIL
1919
from papermerge.core.routers.params import CommonQueryParams
20-
from papermerge.core.utils.decorators import if_redis_present
2120
from papermerge.core.exceptions import EntityNotFound
2221

2322

@@ -121,6 +120,7 @@ def create_node(
121120
if error:
122121
raise HTTPException(status_code=400, detail=error.model_dump())
123122

123+
send_task(INDEX_ADD_NODE, kwargs={"node_id": str(created_node.id)}, route_name="i3")
124124
return created_node
125125

126126

@@ -146,6 +146,7 @@ def update_node(
146146
db_session, node_id=node_id, user_id=user.id, attrs=node
147147
)
148148

149+
send_task(INDEX_ADD_NODE, kwargs={"node_id": str(updated_node.id)}, route_name="i3")
149150
return updated_node
150151

151152

@@ -173,6 +174,12 @@ def delete_nodes(
173174
if error:
174175
raise HTTPException(status_code=400, detail=error.model_dump())
175176

177+
send_task(
178+
INDEX_REMOVE_NODE,
179+
kwargs={"item_ids": [str(i) for i in list_of_uuids]},
180+
route_name="i3",
181+
)
182+
176183

177184
@router.post(
178185
"/move",
@@ -284,7 +291,7 @@ def assign_node_tags(
284291
if error:
285292
raise HTTPException(status_code=400, detail=error.model_dump())
286293

287-
_notify_index(node_id)
294+
send_task(INDEX_ADD_NODE, kwargs={"node_id": str(node_id)}, route_name="i3")
288295

289296
return node
290297

@@ -355,7 +362,7 @@ def update_node_tags(
355362
if error:
356363
raise HTTPException(status_code=400, detail=error.model_dump())
357364

358-
_notify_index(node.id)
365+
send_task(INDEX_ADD_NODE, kwargs={"node_id": str(node_id)}, route_name="i3")
359366

360367
return node
361368

@@ -412,11 +419,6 @@ def remove_node_tags(
412419
if error:
413420
raise HTTPException(status_code=400, detail=error.model_dump())
414421

415-
_notify_index(node.id)
416-
return node
417-
422+
send_task(INDEX_ADD_NODE, kwargs={"node_id": str(node_id)}, route_name="i3")
418423

419-
@if_redis_present
420-
def _notify_index(node_id: uuid.UUID):
421-
id_as_str = str(node_id) # just in case, make sure it is str
422-
celery_app.send_task(INDEX_ADD_NODE, kwargs={"node_id": id_as_str}, route_name="i3")
424+
return node

papermerge/core/features/nodes/schema.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ class Folder(NewFolder):
140140
tags: List[Tag] = []
141141
created_at: datetime
142142
updated_at: datetime
143+
user_id: UUID
143144

144145
breadcrumb: List[Tuple[UUID, str]] = []
145146

papermerge/core/features/page_mngm/db/api.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from pikepdf import Pdf
1111
from sqlalchemy import select, delete
1212

13-
from papermerge.celery_app import app as current_app
13+
from papermerge.core import tasks
1414
from papermerge.core import constants as const
1515
from papermerge.core.pathlib import abs_page_path
1616
from papermerge.core.storage import get_storage_instance
@@ -706,14 +706,18 @@ def get_docver_ids(db_session, document_ids: list[uuid.UUID]) -> list[uuid.UUID]
706706
@if_redis_present
707707
def notify_version_update(add_ver_id: str, remove_ver_id: str):
708708
# Send tasks to the index to remove/add pages
709-
current_app.send_task(const.INDEX_UPDATE, (add_ver_id, remove_ver_id))
709+
tasks.send_task(
710+
const.INDEX_UPDATE,
711+
kwargs={"add_ver_id": add_ver_id, "remove_ver_id": str(remove_ver_id)},
712+
route_name="i3",
713+
)
710714

711-
current_app.send_task(
715+
tasks.send_task(
712716
const.S3_WORKER_ADD_DOC_VER,
713717
kwargs={"doc_ver_ids": [add_ver_id]},
714718
route_name="s3",
715719
)
716-
current_app.send_task(
720+
tasks.send_task(
717721
const.S3_WORKER_REMOVE_DOC_VER,
718722
kwargs={"doc_ver_ids": [remove_ver_id]},
719723
route_name="s3",
@@ -724,13 +728,19 @@ def notify_version_update(add_ver_id: str, remove_ver_id: str):
724728
def notify_add_docs(db_session, add_doc_ids: List[uuid.UUID]):
725729
# send task to index
726730
logger.debug(f"Sending task {const.INDEX_ADD_DOCS} with {add_doc_ids}")
727-
current_app.send_task(const.INDEX_ADD_DOCS, (add_doc_ids,))
731+
tasks.send_task(
732+
const.INDEX_ADD_DOCS,
733+
kwargs={
734+
"doc_ids": [str(i) for i in add_doc_ids],
735+
},
736+
route_name="i3",
737+
)
728738

729739
ids = [
730740
str(doc_id) for doc_id in get_docver_ids(db_session, document_ids=add_doc_ids)
731741
]
732742

733-
current_app.send_task(
743+
tasks.send_task(
734744
const.S3_WORKER_ADD_DOC_VER,
735745
kwargs={"doc_ver_ids": ids},
736746
route_name="s3",
@@ -740,15 +750,15 @@ def notify_add_docs(db_session, add_doc_ids: List[uuid.UUID]):
740750
@if_redis_present
741751
def notify_generate_previews(doc_id: list[str] | str):
742752
if isinstance(doc_id, str):
743-
current_app.send_task(
753+
tasks.send_task(
744754
const.S3_WORKER_GENERATE_PREVIEW,
745755
kwargs={"doc_id": doc_id},
746756
route_name="s3preview",
747757
)
748758
return
749759
elif isinstance(doc_id, list):
750760
for item in doc_id:
751-
current_app.send_task(
761+
tasks.send_task(
752762
const.S3_WORKER_GENERATE_PREVIEW,
753763
kwargs={"doc_id": item},
754764
route_name="s3preview",

papermerge/search/cli/index.py

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
from salinic import IndexRW, create_engine
88
from typing_extensions import Annotated
99

10-
from papermerge.core import db, schemas
10+
from papermerge.core import dbapi, schema
11+
from papermerge.core.db.engine import Session
1112
from papermerge.search.schema import FOLDER, PAGE, SearchIndex
1213

1314
app = typer.Typer(help="Index commands")
@@ -25,42 +26,42 @@ def index_cmd(node_ids: NodeIDsType = None, dry_run: bool = False):
2526

2627
engine = create_engine(SEARCH_URL)
2728
index = IndexRW(engine, schema=SearchIndex)
28-
db_session = db.get_session()
2929

30-
nodes = db.get_nodes(db_session, node_ids)
31-
items = [] # to be added to the index
32-
for node in nodes:
33-
if isinstance(node, schemas.Document):
34-
last_ver = db.get_last_doc_ver(
35-
db_session, user_id=node.user_id, doc_id=node.id
36-
)
37-
pages = db.get_doc_ver_pages(db_session, last_ver.id)
38-
for page in pages:
30+
with Session() as db_session:
31+
nodes = dbapi.get_nodes(db_session, node_ids)
32+
items = [] # to be added to the index
33+
for node in nodes:
34+
if isinstance(node, schema.Document):
35+
last_ver = dbapi.get_last_doc_ver(
36+
db_session, user_id=node.user_id, doc_id=node.id
37+
)
38+
pages = dbapi.get_doc_ver_pages(db_session, last_ver.id)
39+
for page in pages:
40+
item = SearchIndex(
41+
id=str(page.id),
42+
title=node.title,
43+
user_id=str(node.user_id),
44+
document_id=str(node.id),
45+
document_version_id=str(last_ver.id),
46+
page_number=page.number,
47+
text=page.text,
48+
entity_type=PAGE,
49+
tags=[tag.name for tag in node.tags],
50+
)
51+
items.append(item)
52+
else:
3953
item = SearchIndex(
40-
id=str(page.id),
54+
id=str(node.id),
4155
title=node.title,
4256
user_id=str(node.user_id),
43-
document_id=str(node.id),
44-
document_version_id=str(last_ver.id),
45-
page_number=page.number,
46-
text=page.text,
47-
entity_type=PAGE,
57+
entity_type=FOLDER,
4858
tags=[tag.name for tag in node.tags],
4959
)
5060
items.append(item)
51-
else:
52-
item = SearchIndex(
53-
id=str(node.id),
54-
title=node.title,
55-
user_id=str(node.user_id),
56-
entity_type=FOLDER,
57-
tags=[tag.name for tag in node.tags],
58-
)
59-
items.append(item)
6061

61-
if dry_run:
62-
for item in items:
63-
print_json(data=item.model_dump())
64-
else:
65-
for item in items:
66-
index.add(item)
62+
if dry_run:
63+
for item in items:
64+
print_json(data=item.model_dump())
65+
else:
66+
for item in items:
67+
index.add(item)

papermerge/search/cli/index_schema.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
app = typer.Typer(help="Index Schema Management")
1010

11-
SEARCH_URL = os.environ.get('PAPERMERGE__SEARCH__URL')
11+
SEARCH_URL = os.environ.get("PAPERMERGE__SEARCH__URL")
1212
if not SEARCH_URL:
1313
raise ValueError("missing PAPERMERGE__SEARCH__URL")
1414

0 commit comments

Comments
 (0)