Skip to content

Commit fb47a75

Browse files
committed
Add simple compute backend
1 parent 1c5f391 commit fb47a75

File tree

14 files changed

+531
-91
lines changed

14 files changed

+531
-91
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3131
- Add recursive error propagation
3232
- Add reconnection support for Snowflake vector search
3333
- Add scanning for leaked passwords on the CI
34+
- Simple detached-lifetime compute and vector-search implementations
3435

3536
### Bug fixes
3637

superduper/backends/base/scheduler.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from superduper import CFG, logging
1111
from superduper.backends.base.backends import BaseBackend
1212
from superduper.base.base import Base
13-
from superduper.base.event import Create, CreateTable, PutComponent, Update
13+
from superduper.base.event import Create, CreateTable, Event, PutComponent, Update
1414

1515
DependencyType = t.Union[t.Dict[str, str], t.Sequence[t.Dict[str, str]]]
1616

@@ -142,7 +142,7 @@ def _consume_event_type(event_type, ids, table, db: 'Datalayer'):
142142

143143

144144
def cluster_events(
145-
events: t.List[Base],
145+
events: t.List[Event],
146146
):
147147
"""
148148
Cluster events into table, create and job events.
@@ -162,14 +162,14 @@ def cluster_events(
162162
elif isinstance(event, (Update, Create)):
163163
create_events.append(event)
164164
elif isinstance(event, Job):
165-
job_events.append(event)
165+
job_events.append(event) # type: ignore[arg-type]
166166
elif isinstance(event, PutComponent):
167167
put_events.append(event)
168168
return table_events, create_events, put_events, job_events
169169

170170

171171
def consume_events(
172-
events: t.List[Base],
172+
events: t.List[Event],
173173
table: str,
174174
db: 'Datalayer',
175175
batch_size: int | None = None,

superduper/backends/local/scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
consume_events,
88
)
99
from superduper.base import Base
10+
from superduper.base.event import Event
1011
from superduper.components.cdc import CDC
1112
from superduper.misc.importing import isreallyinstance
1213

@@ -73,7 +74,7 @@ def initialize(self):
7374
with self.lock:
7475
self.Q[component, identifier] = []
7576

76-
def publish(self, events: t.List[Base]):
77+
def publish(self, events: t.List[Event]):
7778
"""
7879
Publish events to local queue.
7980
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
from .cluster import SimpleCluster as Cluster
22

3-
__all__ = ['Cluster']
3+
__all__ = ['Cluster']

superduper/backends/simple/cluster.py

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
1+
import os
12
import typing as t
23

34
import click
4-
import os
55

6-
from superduper import logging
6+
from superduper import logging, superduper
77
from superduper.backends.base.cluster import Cluster
88
from superduper.backends.local.cdc import LocalCDCBackend
9-
from superduper.backends.simple.compute import SimpleComputeClient
109
from superduper.backends.local.crontab import LocalCrontabBackend
11-
from superduper.backends.simple.scheduler import SimpleScheduler
1210
from superduper.backends.local.vector_search import LocalVectorSearchBackend
11+
from superduper.backends.simple.compute import SimpleComputeBackend, SimpleComputeClient
12+
from superduper.backends.simple.scheduler import SimpleScheduler
13+
from superduper.backends.simple.vector_search import (
14+
SimpleVectorSearch,
15+
SimpleVectorSearchClient,
16+
)
1317
from superduper.misc.importing import load_plugin
1418

1519

@@ -24,15 +28,55 @@ class SimpleCluster(Cluster):
2428
:param crontab: The crontab backend.
2529
"""
2630

31+
@classmethod
32+
def build(cls, CFG, **kwargs):
33+
"""Build the local cluster."""
34+
return SimpleCluster(
35+
scheduler=SimpleScheduler(),
36+
compute=SimpleComputeClient(),
37+
vector_search=SimpleVectorSearchClient(),
38+
cdc=LocalCDCBackend(),
39+
crontab=LocalCrontabBackend(),
40+
)
41+
42+
def drop(self, force: bool = False):
43+
"""Drop the cluster.
44+
45+
:param force: Force drop the cluster.
46+
"""
47+
if not force:
48+
if not click.confirm(
49+
"Are you sure you want to drop the cluster? ",
50+
default=False,
51+
):
52+
logging.warn("Aborting...")
53+
54+
self.vector_search.drop(force=True)
55+
self.compute.drop(force=True)
56+
57+
58+
class SimpleClusterBackend(Cluster):
59+
"""Simple cluster for running infra on a single machine.
60+
61+
:param compute: The compute backend.
62+
:param cache: The cache backend.
63+
:param scheduler: The scheduler backend.
64+
:param vector_search: The vector search backend.
65+
:param cdc: The change data capture backend
66+
:param crontab: The crontab backend.
67+
"""
68+
2769
@classmethod
2870
def build(cls, CFG, **kwargs):
2971
"""Build the local cluster."""
3072
searcher_impl = load_plugin(CFG.vector_search_engine).VectorSearcher
3173

3274
return SimpleCluster(
3375
scheduler=SimpleScheduler(),
34-
compute=SimpleComputeClient(),
35-
vector_search=LocalVectorSearchBackend(searcher_impl=searcher_impl),
76+
compute=SimpleComputeBackend(),
77+
vector_search=SimpleVectorSearch(
78+
backend=LocalVectorSearchBackend(searcher_impl=searcher_impl)
79+
),
3680
cdc=LocalCDCBackend(),
3781
crontab=LocalCrontabBackend(),
3882
)
@@ -47,4 +91,30 @@ def drop(self, force: bool = False):
4791
"Are you sure you want to drop the cluster? ",
4892
default=False,
4993
):
50-
logging.warn("Aborting...")
94+
logging.warn("Aborting...")
95+
96+
self.vector_search.drop()
97+
self.compute.drop()
98+
99+
100+
if __name__ == "__main__":
101+
102+
import uvicorn
103+
from fastapi import FastAPI
104+
105+
from superduper import CFG
106+
107+
db = superduper(cluster_engine='local')
108+
109+
app = FastAPI()
110+
111+
cluster = SimpleClusterBackend.build(CFG=CFG)
112+
113+
cluster.compute.build(app=app)
114+
cluster.vector_search.build(app=app)
115+
116+
db.cluster = cluster
117+
118+
cluster.db = db
119+
120+
uvicorn.run(app, host='localhost', port=8001)

0 commit comments

Comments
 (0)