diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml
index 3576d66ac59..fa4d1de082a 100644
--- a/.github/workflows/benchmarks.yml
+++ b/.github/workflows/benchmarks.yml
@@ -183,7 +183,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 20 # on a successful run, runs in 8 minutes
container:
- image: rust:1.83.0
+ image: rust:1.88.0
options: --privileged
# filter for a comment containing 'benchmarks please'
if: ${{ github.event_name != 'issue_comment' || (github.event.issue.pull_request && contains(github.event.comment.body, 'benchmarks please')) }}
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index e0de3213dbb..873613afe19 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -113,62 +113,6 @@ jobs:
working-directory: crates/bindings-csharp
run: dotnet test -warnaserror
- sdk_test:
- name: SDK Tests
- runs-on: spacetimedb-runner
- steps:
- - name: Find Git ref
- env:
- GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- run: |
- PR_NUMBER="${{ github.event.inputs.pr_number || null }}"
- if test -n "${PR_NUMBER}"; then
- GIT_REF="$( gh pr view --repo clockworklabs/SpacetimeDB $PR_NUMBER --json headRefName --jq .headRefName )"
- else
- GIT_REF="${{ github.ref }}"
- fi
- echo "GIT_REF=${GIT_REF}" >>"$GITHUB_ENV"
-
- - name: Checkout sources
- uses: actions/checkout@v4
- with:
- ref: ${{ env.GIT_REF }}
-
- - uses: dsherret/rust-toolchain-file@v1
-
- - uses: actions/setup-dotnet@v3
- with:
- global-json-file: modules/global.json
-
- - name: Create /stdb dir
- run: |
- sudo mkdir /stdb
- sudo chmod 777 /stdb
-
- - name: Checkout C# SDK
- uses: actions/checkout@v4
- with:
- repository: clockworklabs/spacetimedb-csharp-sdk
- ref: master
- path: spacetimedb-csharp-sdk
-
- - name: Setup NuGet override for C# SDK
- working-directory: spacetimedb-csharp-sdk
- run: |
- dotnet pack ../crates/bindings-csharp/BSATN.Runtime
- # The SDK package overrides the following crate for use in their tests.
- # Even though it doesn't actually depend on it, we still need to pack it
- # so dotnet doesn't complain.
- dotnet pack ../crates/bindings-csharp/Runtime
- ./tools~/write-nuget-config.sh ..
-
- # clear package caches, so we get fresh ones even if version numbers haven't changed
- dotnet nuget locals all --clear
-
- - name: Run C# SDK tests
- working-directory: spacetimedb-csharp-sdk
- run: dotnet test
-
lints:
name: Lints
runs-on: spacetimedb-runner
diff --git a/.github/workflows/csharp-test.yml b/.github/workflows/csharp-test.yml
new file mode 100644
index 00000000000..befeefe728c
--- /dev/null
+++ b/.github/workflows/csharp-test.yml
@@ -0,0 +1,142 @@
+name: C#/Unity - Test Suite
+
+on:
+ push:
+ branches:
+ - master
+ pull_request:
+
+jobs:
+ unity-testsuite:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v4
+
+ # Run cheap .NET tests first. If those fail, no need to run expensive Unity tests.
+
+ - name: Setup dotnet
+ uses: actions/setup-dotnet@v3
+ with:
+ global-json-file: modules/global.json
+
+ - name: Override NuGet packages
+ run: |
+ dotnet pack crates/bindings-csharp/BSATN.Runtime
+ dotnet pack crates/bindings-csharp/Runtime
+
+ # Write out the nuget config file to `nuget.config`. This causes the spacetimedb-csharp-sdk repository
+ # to be aware of the local versions of the `bindings-csharp` packages in SpacetimeDB, and use them if
+ # available. Otherwise, `spacetimedb-csharp-sdk` will use the NuGet versions of the packages.
+ # This means that (if version numbers match) we will test the local versions of the C# packages, even
+ # if they're not pushed to NuGet.
+ # See https://learn.microsoft.com/en-us/nuget/reference/nuget-config-file for more info on the config file.
+ cd sdks/csharp
+ ./tools~/write-nuget-config.sh ../..
+
+ - name: Run .NET tests
+ working-directory: sdks/csharp
+ run: dotnet test -warnaserror
+
+ - name: Verify C# formatting
+ working-directory: sdks/csharp
+ run: dotnet format --no-restore --verify-no-changes SpacetimeDB.ClientSDK.sln
+
+ # Now, setup the Unity tests.
+
+ - name: Patch spacetimedb dependency in Cargo.toml
+ working-directory: demo/Blackholio/server-rust
+ run: |
+ sed -i "s|spacetimedb *=.*|spacetimedb = \{ path = \"../../../crates/bindings\" \}|" Cargo.toml
+ cat Cargo.toml
+
+ - name: Install Rust toolchain
+ uses: dtolnay/rust-toolchain@stable
+
+ - name: Cache Rust dependencies
+ uses: Swatinem/rust-cache@v2
+ id: cache-rust-deps
+ with:
+ workspaces: demo/Blackholio/server-rust
+ key: ${{ steps.checkout-stdb.outputs.commit }}
+ # Cache Rust deps even if unit tests have failed.
+ cache-on-failure: true
+ # Cache the CLI as well.
+ cache-all-crates: true
+
+ - name: Install SpacetimeDB CLI from the local checkout
+ # Rebuild only if we didn't get a precise cache hit.
+ if: steps.cache-rust-deps.outputs.cache-hit == 'false'
+ run: |
+ cargo install --force --path crates/cli --locked --message-format=short
+ cargo install --force --path crates/standalone --locked --message-format=short
+ # Add a handy alias using the old binary name, so that we don't have to rewrite all scripts (incl. in submodules).
+ ln -sf $HOME/.cargo/bin/spacetimedb-cli $HOME/.cargo/bin/spacetime
+ env:
+ # Share the target directory with our local project to avoid rebuilding same SpacetimeDB crates twice.
+ CARGO_TARGET_DIR: demo/Blackholio/server-rust/target
+
+ - name: Generate client bindings
+ working-directory: demo/Blackholio/server-rust
+ run: bash ./generate.sh -y
+
+ - name: Check for changes
+ run: |
+ # This was copied from tools/check-diff.sh.
+ # It's required because `spacetime generate` creates lines with the SpacetimeDB commit
+ # version, which would make this `git diff` check very brittle if included.
+ PATTERN='^// This was generated using spacetimedb cli version.*'
+ git diff --exit-code --ignore-matching-lines="$PATTERN" -- demo/Blackholio/client-unity/Assets/Scripts/autogen || {
+ echo "Error: Bindings are dirty. Please generate bindings again and commit them to this branch."
+ exit 1
+ }
+
+ - name: Check Unity meta files
+ uses: DeNA/unity-meta-check@v3
+ with:
+ enable_pr_comment: ${{ github.event_name == 'pull_request' }}
+ target_path: sdks/csharp
+ env:
+ GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}"
+
+ - name: Start SpacetimeDB
+ run: |
+ spacetime start &
+ disown
+
+ - name: Run regression tests
+ run: bash sdks/csharp/tools~/run-regression-tests.sh .
+
+ - name: Publish unity-tests module to SpacetimeDB
+ working-directory: demo/Blackholio/server-rust
+ run: |
+ spacetime logout && spacetime login --server-issued-login local
+ bash ./publish.sh
+
+ - name: Patch com.clockworklabs.spacetimedbsdk dependency in manifest.json
+ working-directory: demo/Blackholio/client-unity/Packages
+ run: |
+ # Replace the com.clockworklabs.spacetimedbsdk dependency with the current branch.
+ # Note: Pointing to a local directory does not work, because our earlier steps nuke our meta files, which then causes Unity to not properly respect the DLLs (e.g.
+ # codegen does not work properly).
+ yq e -i '.dependencies["com.clockworklabs.spacetimedbsdk"] = "https://github.com/clockworklabs/SpacetimeDB.git?path=sdks/csharp#${{ github.head_ref }}"' manifest.json
+ cat manifest.json
+
+ - uses: actions/cache@v3
+ with:
+ path: demo/Blackholio/client-unity/Library
+ key: Unity-${{ github.head_ref }}
+ restore-keys: Unity-
+
+ - name: Run Unity tests
+ uses: game-ci/unity-test-runner@v4
+ with:
+ unityVersion: 2022.3.32f1 # Adjust Unity version to a valid tag
+ projectPath: demo/Blackholio/client-unity # Path to the Unity project subdirectory
+ githubToken: ${{ secrets.GITHUB_TOKEN }}
+ testMode: playmode
+ useHostNetwork: true
+ env:
+ UNITY_EMAIL: ${{ secrets.UNITY_EMAIL }}
+ UNITY_PASSWORD: ${{ secrets.UNITY_PASSWORD }}
+ UNITY_SERIAL: ${{ secrets.UNITY_SERIAL }}
diff --git a/.github/workflows/docs-check-links.yml b/.github/workflows/docs-check-links.yml
new file mode 100644
index 00000000000..62f55eac51d
--- /dev/null
+++ b/.github/workflows/docs-check-links.yml
@@ -0,0 +1,30 @@
+name: Docs - Check Link Validity
+
+on:
+ pull_request:
+ push:
+ branches:
+ - master
+ merge_group:
+
+jobs:
+ check-links:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v3
+
+ - name: Set up Node.js
+ uses: actions/setup-node@v3
+ with:
+ node-version: '16' # or the version of Node.js you're using
+
+ - name: Install dependencies
+ working-directory: docs
+ run: |
+ npm install
+
+ - name: Run link check
+ working-directory: docs
+ run: |
+ npm run check-links
diff --git a/.github/workflows/docs-validate-nav-build.yml b/.github/workflows/docs-validate-nav-build.yml
new file mode 100644
index 00000000000..674ca22a6ae
--- /dev/null
+++ b/.github/workflows/docs-validate-nav-build.yml
@@ -0,0 +1,47 @@
+name: Docs - Validate nav.ts Matches nav.js
+
+on:
+ pull_request:
+ push:
+ branches:
+ - master
+ merge_group:
+
+jobs:
+ validate-build:
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v3
+
+ - name: Set up Node.js
+ uses: actions/setup-node@v3
+ with:
+ node-version: '16'
+
+ - name: Install dependencies
+ working-directory: docs
+ run: |
+ npm install
+
+ - name: Backup existing nav.js
+ working-directory: docs
+ run: |
+ mv docs/nav.js docs/nav.js.original
+
+ - name: Build nav.ts
+ working-directory: docs
+ run: |
+ npm run build
+
+ - name: Compare generated nav.js with original nav.js
+ working-directory: docs
+ run: |
+ diff -q docs/nav.js docs/nav.js.original || (echo "Generated nav.js differs from committed version. Run 'npm run build' and commit the updated file." && exit 1)
+
+ - name: Restore original nav.js
+ working-directory: docs
+ if: success() || failure()
+ run: |
+ mv docs/nav.js.original docs/nav.js
diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml
new file mode 100644
index 00000000000..72c1f153f02
--- /dev/null
+++ b/.github/workflows/lint.yml
@@ -0,0 +1,43 @@
+name: TypeScript - Lint
+
+on:
+ pull_request:
+ push:
+ branches:
+ - master
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v4
+
+ - name: Set up Node.js
+ uses: actions/setup-node@v4
+ with:
+ node-version: 18
+
+ - uses: pnpm/action-setup@v4
+ with:
+ version: 9.7
+ run_install: true
+
+ - name: Get pnpm store directory
+ working-directory: sdks/typescript
+ shell: bash
+ run: |
+ echo "STORE_PATH=$(pnpm store path --silent)" >> $GITHUB_ENV
+
+ - uses: actions/cache@v4
+ name: Setup pnpm cache
+ with:
+ path: ${{ env.STORE_PATH }}
+ key: ${{ runner.os }}-pnpm-store-${{ hashFiles('**/pnpm-lock.yaml') }}
+ restore-keys: |
+ ${{ runner.os }}-pnpm-store-
+
+ - name: Lint
+ working-directory: sdks/typescript
+ run: pnpm lint
diff --git a/.github/workflows/pr-only-ci.yml b/.github/workflows/pr-only-ci.yml
index 5aab0510c08..74e1a12e68f 100644
--- a/.github/workflows/pr-only-ci.yml
+++ b/.github/workflows/pr-only-ci.yml
@@ -31,18 +31,12 @@ jobs:
with:
ref: ${{ env.GIT_REF }}
- uses: dsherret/rust-toolchain-file@v1
- - name: Checkout docs
- uses: actions/checkout@v4
- with:
- repository: clockworklabs/spacetime-docs
- ref: master
- path: spacetime-docs
- name: Check for docs change
run: |
- cargo run --features markdown-docs -p spacetimedb-cli > spacetime-docs/docs/cli-reference.md
- cd spacetime-docs
+ cargo run --features markdown-docs -p spacetimedb-cli > docs/docs/cli-reference.md
+ cd docs
# This is needed because our website doesn't render markdown quite properly.
- # See the README in spacetime-docs for more details.
+ # See the README in docs for more details.
sed -i'' -E 's!^(##) `(.*)`$!\1 \2!' docs/cli-reference.md
sed -i'' -E 's!^(######) \*\*(.*)\*\*$!\1 \2!' docs/cli-reference.md
git status
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
new file mode 100644
index 00000000000..a73cda9a87e
--- /dev/null
+++ b/.github/workflows/test.yml
@@ -0,0 +1,142 @@
+name: TypeScript - Tests
+
+on:
+ push:
+ branches:
+ - master
+ pull_request:
+
+jobs:
+ compile-and-test:
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v4
+
+ - name: Set up Node.js
+ uses: actions/setup-node@v4
+ with:
+ node-version: 18
+
+ - uses: pnpm/action-setup@v4
+ with:
+ version: 9.7
+ run_install: true
+
+ - name: Get pnpm store directory
+ shell: bash
+ working-directory: sdks/typescript
+ run: |
+ echo "STORE_PATH=$(pnpm store path --silent)" >> $GITHUB_ENV
+
+ - uses: actions/cache@v4
+ name: Setup pnpm cache
+ with:
+ path: ${{ env.STORE_PATH }}
+ key: ${{ runner.os }}-pnpm-store-${{ hashFiles('**/pnpm-lock.yaml') }}
+ restore-keys: |
+ ${{ runner.os }}-pnpm-store-
+
+ - name: Compile
+ working-directory: sdks/typescript
+ run: pnpm compile
+
+ - name: Run sdk tests
+ working-directory: sdks/typescript/packages/sdk
+ run: pnpm test
+
+ # - name: Extract SpacetimeDB branch name from file
+ # id: extract-branch
+ # run: |
+ # # Define the path to the branch file
+ # BRANCH_FILE=".github/spacetimedb-branch.txt"
+
+ # # Default to master if file doesn't exist
+ # if [ ! -f "$BRANCH_FILE" ]; then
+ # echo "::notice::No SpacetimeDB branch file found, using 'master'"
+ # echo "branch=master" >> $GITHUB_OUTPUT
+ # exit 0
+ # fi
+
+ # # Read and trim whitespace from the file
+ # branch=$(cat "$BRANCH_FILE" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//')
+
+ # # Fallback to master if empty
+ # if [ -z "$branch" ]; then
+ # echo "::warning::SpacetimeDB branch file is empty, using 'master'"
+ # branch="master"
+ # fi
+
+ # echo "branch=$branch" >> $GITHUB_OUTPUT
+ # echo "Using SpacetimeDB branch from file: $branch"
+
+ - name: Install Rust toolchain
+ uses: dtolnay/rust-toolchain@stable
+
+ - name: Cache Rust dependencies
+ uses: Swatinem/rust-cache@v2
+ with:
+ workspaces: modules/quickstart-chat
+ shared-key: quickstart-chat-test
+
+ - name: Install SpacetimeDB CLI from the local checkout
+ run: |
+ cargo install --force --path crates/cli --locked --message-format=short
+ cargo install --force --path crates/standalone --locked --message-format=short
+ # Add a handy alias using the old binary name, so that we don't have to rewrite all scripts (incl. in submodules).
+ rm -f $HOME/.cargo/bin/spacetime
+ ln -s $HOME/.cargo/bin/spacetimedb-cli $HOME/.cargo/bin/spacetime
+ # Clear any existing information
+ spacetime server clear -y
+ env:
+ # Share the target directory with our local project to avoid rebuilding same SpacetimeDB crates twice.
+ CARGO_TARGET_DIR: modules/quickstart-chat/target
+
+ - name: Generate client bindings
+ working-directory: modules/quickstart-chat
+ run: |
+ spacetime generate --lang typescript --out-dir ../../sdks/typescript/examples/quickstart-chat/src/module_bindings
+ cd ../../sdks/typescript
+ pnpm lint --write
+
+ - name: Check for changes
+ working-directory: sdks/typescript
+ run: |
+ # This was copied from SpacetimeDB/tools/check-diff.sh.
+ # It's required because `spacetime generate` creates lines with the SpacetimeDB commit
+ # version, which would make this `git diff` check very brittle if included.
+ PATTERN='^// This was generated using spacetimedb cli version.*'
+ if ! git diff --exit-code --ignore-matching-lines="$PATTERN" -- examples/quickstart-chat/src/module_bindings; then
+ echo "Error: Bindings are dirty. Please generate bindings again and commit them to this branch."
+ exit 1
+ fi
+
+ # - name: Start SpacetimeDB
+ # run: |
+ # spacetime start &
+ # disown
+
+ # - name: Publish module to SpacetimeDB
+ # working-directory: SpacetimeDB/modules/quickstart-chat
+ # run: |
+ # spacetime logout && spacetime login --server-issued-login local
+ # spacetime publish -s local quickstart-chat -c -y
+
+ # - name: Publish module to SpacetimeDB
+ # working-directory: SpacetimeDB/modules/quickstart-chat
+ # run: |
+ # spacetime logs quickstart-chat
+
+ - name: Check that quickstart-chat builds
+ working-directory: sdks/typescript/examples/quickstart-chat
+ run: pnpm build
+
+ # - name: Run quickstart-chat tests
+ # working-directory: examples/quickstart-chat
+ # run: pnpm test
+ #
+ # # Run this step always, even if the previous steps fail
+ # - name: Print rows in the user table
+ # if: always()
+ # run: spacetime sql quickstart-chat "SELECT * FROM user"
diff --git a/Cargo.lock b/Cargo.lock
index 6c3e66c3aa4..62ee3055a79 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -429,6 +429,26 @@ dependencies = [
"serde",
]
+[[package]]
+name = "bindgen"
+version = "0.71.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3"
+dependencies = [
+ "bitflags 2.9.0",
+ "cexpr",
+ "clang-sys",
+ "itertools 0.12.1",
+ "log",
+ "prettyplease",
+ "proc-macro2",
+ "quote",
+ "regex",
+ "rustc-hash",
+ "shlex",
+ "syn 2.0.101",
+]
+
[[package]]
name = "bit-set"
version = "0.5.3"
@@ -712,6 +732,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c"
+[[package]]
+name = "cexpr"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
+dependencies = [
+ "nom",
+]
+
[[package]]
name = "cfg-if"
version = "1.0.0"
@@ -776,6 +805,17 @@ dependencies = [
"inout",
]
+[[package]]
+name = "clang-sys"
+version = "1.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4"
+dependencies = [
+ "glob",
+ "libc",
+ "libloading",
+]
+
[[package]]
name = "clap"
version = "3.2.23"
@@ -1541,6 +1581,12 @@ dependencies = [
"syn 2.0.101",
]
+[[package]]
+name = "dissimilar"
+version = "1.0.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "59f8e79d1fbf76bdfbde321e902714bf6c49df88a7dda6fc682fc2979226962d"
+
[[package]]
name = "duct"
version = "0.13.7"
@@ -1746,6 +1792,16 @@ dependencies = [
"serde",
]
+[[package]]
+name = "expect-test"
+version = "1.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "63af43ff4431e848fb47472a920f14fa71c24de13255a5692e93d4e90302acb0"
+dependencies = [
+ "dissimilar",
+ "once_cell",
+]
+
[[package]]
name = "fallible-iterator"
version = "0.2.0"
@@ -1886,6 +1942,16 @@ dependencies = [
"winapi",
]
+[[package]]
+name = "fslock"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "04412b8935272e3a9bae6f48c7bfff74c2911f60525404edfdd28e49884c3bfb"
+dependencies = [
+ "libc",
+ "winapi",
+]
+
[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
@@ -2069,6 +2135,15 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2"
+[[package]]
+name = "gzip-header"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "95cc527b92e6029a62960ad99aa8a6660faa4555fe5f731aab13aa6a921795a2"
+dependencies = [
+ "crc32fast",
+]
+
[[package]]
name = "h2"
version = "0.3.26"
@@ -2488,7 +2563,7 @@ dependencies = [
"shlex",
"tempfile",
"version-compare",
- "which",
+ "which 4.4.2",
]
[[package]]
@@ -2953,6 +3028,16 @@ version = "0.2.172"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
+[[package]]
+name = "libloading"
+version = "0.8.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667"
+dependencies = [
+ "cfg-if",
+ "windows-targets 0.53.0",
+]
+
[[package]]
name = "libm"
version = "0.2.15"
@@ -3168,6 +3253,12 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
+[[package]]
+name = "minimal-lexical"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
+
[[package]]
name = "miniz_oxide"
version = "0.8.8"
@@ -3307,6 +3398,16 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451"
+[[package]]
+name = "nom"
+version = "7.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
+dependencies = [
+ "memchr",
+ "minimal-lexical",
+]
+
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@@ -3901,6 +4002,16 @@ dependencies = [
"yansi",
]
+[[package]]
+name = "prettyplease"
+version = "0.2.34"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6837b9e10d61f45f987d50808f83d1ee3d206c66acf650c3e4ae2e1f6ddedf55"
+dependencies = [
+ "proc-macro2",
+ "syn 2.0.101",
+]
+
[[package]]
name = "proc-macro-crate"
version = "3.3.0"
@@ -5230,6 +5341,7 @@ dependencies = [
"spacetimedb-client-api",
"spacetimedb-core",
"spacetimedb-data-structures",
+ "spacetimedb-datastore",
"spacetimedb-execution",
"spacetimedb-lib",
"spacetimedb-paths",
@@ -5333,6 +5445,7 @@ name = "spacetimedb-client-api"
version = "1.2.0"
dependencies = [
"anyhow",
+ "async-stream",
"async-trait",
"axum",
"axum-extra",
@@ -5344,6 +5457,7 @@ dependencies = [
"futures",
"headers",
"http 1.3.1",
+ "humantime",
"hyper 1.6.0",
"hyper-util",
"itoa",
@@ -5351,6 +5465,7 @@ dependencies = [
"lazy_static",
"log",
"mime",
+ "pretty_assertions",
"prometheus",
"rand 0.9.1",
"regex",
@@ -5361,6 +5476,7 @@ dependencies = [
"spacetimedb-client-api-messages",
"spacetimedb-core",
"spacetimedb-data-structures",
+ "spacetimedb-datastore",
"spacetimedb-jsonwebtoken",
"spacetimedb-lib",
"spacetimedb-paths",
@@ -5369,6 +5485,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-tungstenite",
+ "toml 0.8.22",
"tower-http",
"tower-layer",
"tower-service",
@@ -5476,6 +5593,7 @@ dependencies = [
"enum-as-inner",
"enum-map",
"env_logger 0.10.2",
+ "expect-test",
"faststr",
"flate2",
"fs2",
@@ -5522,6 +5640,7 @@ dependencies = [
"spacetimedb-client-api-messages",
"spacetimedb-commitlog",
"spacetimedb-data-structures",
+ "spacetimedb-datastore",
"spacetimedb-durability",
"spacetimedb-execution",
"spacetimedb-expr",
@@ -5564,6 +5683,7 @@ dependencies = [
"url",
"urlencoding",
"uuid",
+ "v8",
"wasmtime",
]
@@ -5581,6 +5701,41 @@ dependencies = [
"thiserror 1.0.69",
]
+[[package]]
+name = "spacetimedb-datastore"
+version = "1.2.0"
+dependencies = [
+ "anyhow",
+ "bytes",
+ "derive_more",
+ "enum-as-inner",
+ "enum-map",
+ "itertools 0.12.1",
+ "lazy_static",
+ "log",
+ "once_cell",
+ "parking_lot 0.12.3",
+ "pretty_assertions",
+ "prometheus",
+ "proptest",
+ "smallvec",
+ "spacetimedb-commitlog",
+ "spacetimedb-data-structures",
+ "spacetimedb-durability",
+ "spacetimedb-execution",
+ "spacetimedb-lib",
+ "spacetimedb-metrics",
+ "spacetimedb-paths",
+ "spacetimedb-primitives",
+ "spacetimedb-sats",
+ "spacetimedb-schema",
+ "spacetimedb-snapshot",
+ "spacetimedb-table",
+ "strum",
+ "thin-vec",
+ "thiserror 1.0.69",
+]
+
[[package]]
name = "spacetimedb-durability"
version = "1.2.0"
@@ -5741,6 +5896,8 @@ dependencies = [
"anyhow",
"derive_more",
"either",
+ "expect-test",
+ "itertools 0.12.1",
"pretty_assertions",
"spacetimedb-expr",
"spacetimedb-lib",
@@ -5885,6 +6042,7 @@ dependencies = [
"rand 0.9.1",
"scopeguard",
"spacetimedb-core",
+ "spacetimedb-datastore",
"spacetimedb-durability",
"spacetimedb-fs-utils",
"spacetimedb-lib",
@@ -5929,12 +6087,14 @@ dependencies = [
"parse-size",
"prometheus",
"scopeguard",
+ "serde",
"serde_json",
"sled",
"socket2",
"spacetimedb-client-api",
"spacetimedb-client-api-messages",
"spacetimedb-core",
+ "spacetimedb-datastore",
"spacetimedb-lib",
"spacetimedb-paths",
"spacetimedb-table",
@@ -6150,6 +6310,7 @@ dependencies = [
"rusqlite",
"rust_decimal",
"spacetimedb-core",
+ "spacetimedb-datastore",
"spacetimedb-lib",
"spacetimedb-sats",
"spacetimedb-vm",
@@ -7187,6 +7348,22 @@ dependencies = [
"getrandom 0.3.2",
]
+[[package]]
+name = "v8"
+version = "137.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1ca393e2032ddba2a57169e15cac5d0a81cdb3d872a8886f4468bc0f486098d2"
+dependencies = [
+ "bindgen",
+ "bitflags 2.9.0",
+ "fslock",
+ "gzip-header",
+ "home",
+ "miniz_oxide",
+ "paste",
+ "which 6.0.3",
+]
+
[[package]]
name = "valuable"
version = "0.1.1"
@@ -7661,6 +7838,18 @@ dependencies = [
"rustix 0.38.44",
]
+[[package]]
+name = "which"
+version = "6.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b4ee928febd44d98f2f459a4a79bd4d928591333a494a10a868418ac1b39cf1f"
+dependencies = [
+ "either",
+ "home",
+ "rustix 0.38.44",
+ "winsafe",
+]
+
[[package]]
name = "whoami"
version = "1.6.0"
@@ -8143,6 +8332,12 @@ dependencies = [
"windows-sys 0.48.0",
]
+[[package]]
+name = "winsafe"
+version = "0.0.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d135d17ab770252ad95e9a872d365cf3090e3be864a34ab46f48555993efc904"
+
[[package]]
name = "wit-bindgen-rt"
version = "0.39.0"
diff --git a/Cargo.toml b/Cargo.toml
index 5b66b23bbab..7934d987422 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,6 +11,7 @@ members = [
"crates/commitlog",
"crates/core",
"crates/data-structures",
+ "crates/datastore",
"crates/durability",
"crates/execution",
"crates/expr",
@@ -90,7 +91,7 @@ debug = true
version = "1.2.0"
edition = "2021"
# update rust-toolchain.toml too!
-rust-version = "1.84.0"
+rust-version = "1.88.0"
[workspace.dependencies]
spacetimedb = { path = "crates/bindings", version = "1.2.0" }
@@ -104,6 +105,7 @@ spacetimedb-codegen = { path = "crates/codegen", version = "1.2.0" }
spacetimedb-commitlog = { path = "crates/commitlog", version = "1.2.0" }
spacetimedb-core = { path = "crates/core", version = "1.2.0" }
spacetimedb-data-structures = { path = "crates/data-structures", version = "1.2.0" }
+spacetimedb-datastore = { path = "crates/datastore", version = "1.2.0" }
spacetimedb-durability = { path = "crates/durability", version = "1.2.0" }
spacetimedb-execution = { path = "crates/execution", version = "1.2.0" }
spacetimedb-expr = { path = "crates/expr", version = "1.2.0" }
@@ -168,6 +170,7 @@ enum-as-inner = "0.6"
enum-map = "2.6.3"
env_logger = "0.10"
ethnum = { version = "1.5.0", features = ["serde"] }
+expect-test = "1.5.0"
flate2 = "1.0.24"
foldhash = "0.1.4"
fs-err = "2.9.0"
@@ -283,6 +286,7 @@ unicode-normalization = "0.1.23"
url = "2.3.1"
urlencoding = "2.1.2"
uuid = { version = "1.2.1", features = ["v4"] }
+v8 = "137.2"
walkdir = "2.2.5"
wasmbin = "0.6"
webbrowser = "1.0.2"
@@ -324,3 +328,10 @@ features = [
"broadcast",
"ondemand",
]
+
+[workspace.lints.rust]
+unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
+
+[workspace.lints.clippy]
+# FIXME: we should work on this lint incrementally
+result_large_err = "allow"
diff --git a/crates/auth/Cargo.toml b/crates/auth/Cargo.toml
index 775830ebbfb..a5592c08f5f 100644
--- a/crates/auth/Cargo.toml
+++ b/crates/auth/Cargo.toml
@@ -16,3 +16,6 @@ jsonwebtoken.workspace = true
[dev-dependencies]
serde_json.workspace = true
+
+[lints]
+workspace = true
diff --git a/crates/bench/Cargo.toml b/crates/bench/Cargo.toml
index ce6a138c2c4..2febc5b44cf 100644
--- a/crates/bench/Cargo.toml
+++ b/crates/bench/Cargo.toml
@@ -40,6 +40,7 @@ bench = false
spacetimedb-client-api = { path = "../client-api" }
spacetimedb-core = { path = "../core", features = ["test"] }
spacetimedb-data-structures.workspace = true
+spacetimedb-datastore.workspace = true
spacetimedb-execution = { path = "../execution" }
spacetimedb-lib = { path = "../lib" }
spacetimedb-paths.workspace = true
@@ -89,3 +90,6 @@ tikv-jemalloc-ctl = { workspace = true }
iai-callgrind = { git = "https://github.com/clockworklabs/iai-callgrind.git", branch = "main" }
iai-callgrind-runner = { git = "https://github.com/clockworklabs/iai-callgrind.git", branch = "main" }
iai-callgrind-macros = { git = "https://github.com/clockworklabs/iai-callgrind.git", branch = "main" }
+
+[lints]
+workspace = true
diff --git a/crates/bench/Dockerfile b/crates/bench/Dockerfile
index 061cb337815..2c4ca4c5b7f 100644
--- a/crates/bench/Dockerfile
+++ b/crates/bench/Dockerfile
@@ -3,7 +3,7 @@
# See the README for commands to run.
# sync with: ../../rust-toolchain.toml
-FROM rust:1.84.0
+FROM rust:1.88.0
RUN apt-get update && \
apt-get install -y valgrind bash && \
diff --git a/crates/bench/benches/delete_table.rs b/crates/bench/benches/delete_table.rs
index 2dfe1962afc..7d63bb8da1f 100644
--- a/crates/bench/benches/delete_table.rs
+++ b/crates/bench/benches/delete_table.rs
@@ -7,8 +7,8 @@ use criterion::{
use itertools::Itertools;
use rand::{prelude::*, seq::SliceRandom};
use smallvec::SmallVec;
-use spacetimedb::db::datastore::locking_tx_datastore::delete_table;
use spacetimedb_data_structures::map::HashSet;
+use spacetimedb_datastore::locking_tx_datastore::delete_table;
use spacetimedb_sats::layout::Size;
use spacetimedb_table::indexes::{PageIndex, PageOffset, RowPointer, SquashedOffset};
use std::collections::BTreeSet;
diff --git a/crates/bench/benches/subscription.rs b/crates/bench/benches/subscription.rs
index 5fe56e1a9e5..6a44916e287 100644
--- a/crates/bench/benches/subscription.rs
+++ b/crates/bench/benches/subscription.rs
@@ -1,6 +1,5 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use spacetimedb::error::DBError;
-use spacetimedb::execution_context::Workload;
use spacetimedb::host::module_host::DatabaseTableUpdate;
use spacetimedb::identity::AuthCtx;
use spacetimedb::messages::websocket::BsatnFormat;
@@ -12,6 +11,7 @@ use spacetimedb::subscription::{collect_table_update, TableUpdateType};
use spacetimedb::{db::relational_db::RelationalDB, messages::websocket::Compression};
use spacetimedb_bench::database::BenchDatabase as _;
use spacetimedb_bench::spacetime_raw::SpacetimeRaw;
+use spacetimedb_datastore::execution_context::Workload;
use spacetimedb_execution::pipelined::PipelinedProject;
use spacetimedb_primitives::{col_list, TableId};
use spacetimedb_query::compile_subscription;
diff --git a/crates/bench/src/bin/summarize.rs b/crates/bench/src/bin/summarize.rs
index eb3040aeec9..72dd32d0ba9 100644
--- a/crates/bench/src/bin/summarize.rs
+++ b/crates/bench/src/bin/summarize.rs
@@ -99,7 +99,7 @@ mod criterion {
/// `"{crit_dir}/"{name}.json"`
fn packed_baseline_json_path(crit_dir: &Path, name: &str) -> PathBuf {
- crit_dir.join(format!("{}.json", name))
+ crit_dir.join(format!("{name}.json"))
}
lazy_static::lazy_static! {
@@ -392,7 +392,7 @@ mod callgrind {
}
fn packed_json_path(iai_callgrind_dir: &Path, name: &str) -> PathBuf {
- iai_callgrind_dir.join(format!("{}.json", name))
+ iai_callgrind_dir.join(format!("{name}.json"))
}
fn get_iai_callgrind_dir(target_dir: &Path) -> PathBuf {
diff --git a/crates/bench/src/spacetime_module.rs b/crates/bench/src/spacetime_module.rs
index 0b67162cce8..0c0ab892895 100644
--- a/crates/bench/src/spacetime_module.rs
+++ b/crates/bench/src/spacetime_module.rs
@@ -70,10 +70,10 @@ impl BenchDatabase for SpacetimeModule {
});
for table in module.client.module.info.module_def.tables() {
- log::trace!("SPACETIME_MODULE: LOADED TABLE: {:?}", table);
+ log::trace!("SPACETIME_MODULE: LOADED TABLE: {table:?}");
}
for reducer in module.client.module.info.module_def.reducers() {
- log::trace!("SPACETIME_MODULE: LOADED REDUCER: {:?}", reducer);
+ log::trace!("SPACETIME_MODULE: LOADED REDUCER: {reducer:?}");
}
Ok(SpacetimeModule {
runtime,
diff --git a/crates/bench/src/spacetime_raw.rs b/crates/bench/src/spacetime_raw.rs
index 3de9bab3f09..804dccfe88b 100644
--- a/crates/bench/src/spacetime_raw.rs
+++ b/crates/bench/src/spacetime_raw.rs
@@ -4,7 +4,7 @@ use crate::{
ResultBench,
};
use spacetimedb::db::relational_db::{tests_utils::TestDB, RelationalDB};
-use spacetimedb::execution_context::Workload;
+use spacetimedb_datastore::execution_context::Workload;
use spacetimedb_primitives::{ColId, IndexId, TableId};
use spacetimedb_sats::{bsatn, AlgebraicValue};
use spacetimedb_schema::{
diff --git a/crates/bindings-macro/Cargo.toml b/crates/bindings-macro/Cargo.toml
index 4fc2acbccec..9e0094df23a 100644
--- a/crates/bindings-macro/Cargo.toml
+++ b/crates/bindings-macro/Cargo.toml
@@ -19,3 +19,6 @@ proc-macro2.workspace = true
quote.workspace = true
syn.workspace = true
heck.workspace = true
+
+[lints]
+workspace = true
diff --git a/crates/bindings-macro/src/sats.rs b/crates/bindings-macro/src/sats.rs
index 98c608306b3..2d919431f98 100644
--- a/crates/bindings-macro/src/sats.rs
+++ b/crates/bindings-macro/src/sats.rs
@@ -347,7 +347,7 @@ pub(crate) fn derive_deserialize(ty: &SatsType<'_>) -> TokenStream {
de_generics.params.insert(0, de_lt_param.into());
let (de_impl_generics, _, de_where_clause) = de_generics.split_for_impl();
- let (iter_n, iter_n2, iter_n3) = (0usize.., 0usize.., 0usize..);
+ let (iter_n, iter_n2, iter_n3, iter_n4) = (0usize.., 0usize.., 0usize.., 0usize..);
match &ty.data {
SatsTypeData::Product(fields) => {
@@ -443,8 +443,8 @@ pub(crate) fn derive_deserialize(ty: &SatsType<'_>) -> TokenStream {
impl #de_impl_generics #spacetimedb_lib::de::FieldNameVisitor<'de> for __ProductVisitor #ty_generics #de_where_clause {
type Output = __ProductFieldIdent;
- fn field_names(&self, names: &mut dyn #spacetimedb_lib::de::ValidNames) {
- names.extend::<&[&str]>(&[#(#field_strings),*])
+ fn field_names(&self) -> impl '_ + Iterator- > {
+ [#(#field_strings),*].into_iter().map(Some)
}
fn visit<__E: #spacetimedb_lib::de::Error>(self, name: &str) -> Result {
@@ -453,6 +453,13 @@ pub(crate) fn derive_deserialize(ty: &SatsType<'_>) -> TokenStream {
_ => Err(#spacetimedb_lib::de::Error::unknown_field_name(name, &self)),
}
}
+
+ fn visit_seq<__E: #spacetimedb_lib::de::Error>(self, index: usize, name: &str) -> Result {
+ match index {
+ #(#iter_n4 => Ok(__ProductFieldIdent::#field_names),)*
+ _ => Err(#spacetimedb_lib::de::Error::unknown_field_name(name, &self)),
+ }
+ }
}
#[allow(non_camel_case_types)]
@@ -516,11 +523,11 @@ pub(crate) fn derive_deserialize(ty: &SatsType<'_>) -> TokenStream {
#(#variant_idents,)*
}
- impl #de_impl_generics #spacetimedb_lib::de::VariantVisitor for __SumVisitor #ty_generics #de_where_clause {
+ impl #de_impl_generics #spacetimedb_lib::de::VariantVisitor<'de> for __SumVisitor #ty_generics #de_where_clause {
type Output = __Variant;
- fn variant_names(&self, names: &mut dyn #spacetimedb_lib::de::ValidNames) {
- names.extend::<&[&str]>(&[#(#variant_names,)*])
+ fn variant_names(&self) -> impl '_ + Iterator
- {
+ [#(#variant_names,)*].into_iter()
}
fn visit_tag(self, __tag: u8) -> Result {
diff --git a/crates/bindings-macro/src/table.rs b/crates/bindings-macro/src/table.rs
index 3da4bee2e67..882ed27ff0f 100644
--- a/crates/bindings-macro/src/table.rs
+++ b/crates/bindings-macro/src/table.rs
@@ -800,7 +800,7 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R
if std::env::var("PROC_MACRO_DEBUG").is_ok() {
{
#![allow(clippy::disallowed_macros)]
- println!("{}", emission);
+ println!("{emission}");
}
}
diff --git a/crates/bindings-macro/src/util.rs b/crates/bindings-macro/src/util.rs
index 8f1d7225ce3..41c9dad016d 100644
--- a/crates/bindings-macro/src/util.rs
+++ b/crates/bindings-macro/src/util.rs
@@ -76,7 +76,7 @@ pub(crate) fn one_of(options: &[crate::sym::Symbol]) -> String {
}
_ => {
let join = options.join("`, `");
- format!("expected one of: `{}`", join)
+ format!("expected one of: `{join}`")
}
}
}
diff --git a/crates/bindings-sys/Cargo.toml b/crates/bindings-sys/Cargo.toml
index 23758e89df7..ee8fb474b6d 100644
--- a/crates/bindings-sys/Cargo.toml
+++ b/crates/bindings-sys/Cargo.toml
@@ -15,3 +15,6 @@ unstable = []
[dependencies]
spacetimedb-primitives.workspace = true
+
+[lints]
+workspace = true
diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs
index a9cc8c968e5..b901de7e5d7 100644
--- a/crates/bindings-sys/src/lib.rs
+++ b/crates/bindings-sys/src/lib.rs
@@ -148,7 +148,7 @@ pub mod raw {
///
/// Traps if:
/// - `prefix_elems > 0`
- /// and (`prefix_ptr` is NULL or `prefix` is not in bounds of WASM memory).
+ /// and (`prefix_ptr` is NULL or `prefix` is not in bounds of WASM memory).
/// - `rstart` is NULL or `rstart` is not in bounds of WASM memory.
/// - `rend` is NULL or `rend` is not in bounds of WASM memory.
/// - `out` is NULL or `out[..size_of::()]` is not in bounds of WASM memory.
@@ -161,11 +161,11 @@ pub mod raw {
/// - `NO_SUCH_INDEX`, when `index_id` is not a known ID of an index.
/// - `WRONG_INDEX_ALGO` if the index is not a range-compatible index.
/// - `BSATN_DECODE_ERROR`, when `prefix` cannot be decoded to
- /// a `prefix_elems` number of `AlgebraicValue`
- /// typed at the initial `prefix_elems` `AlgebraicType`s of the index's key type.
- /// Or when `rstart` or `rend` cannot be decoded to an `Bound`
- /// where the inner `AlgebraicValue`s are
- /// typed at the `prefix_elems + 1` `AlgebraicType` of the index's key type.
+ /// a `prefix_elems` number of `AlgebraicValue`
+ /// typed at the initial `prefix_elems` `AlgebraicType`s of the index's key type.
+ /// Or when `rstart` or `rend` cannot be decoded to an `Bound`
+ /// where the inner `AlgebraicValue`s are
+ /// typed at the `prefix_elems + 1` `AlgebraicType` of the index's key type.
pub fn datastore_index_scan_range_bsatn(
index_id: IndexId,
prefix_ptr: *const u8,
@@ -212,7 +212,7 @@ pub mod raw {
///
/// Traps if:
/// - `prefix_elems > 0`
- /// and (`prefix_ptr` is NULL or `prefix` is not in bounds of WASM memory).
+ /// and (`prefix_ptr` is NULL or `prefix` is not in bounds of WASM memory).
/// - `rstart` is NULL or `rstart` is not in bounds of WASM memory.
/// - `rend` is NULL or `rend` is not in bounds of WASM memory.
/// - `out` is NULL or `out[..size_of::()]` is not in bounds of WASM memory.
@@ -225,11 +225,11 @@ pub mod raw {
/// - `NO_SUCH_INDEX`, when `index_id` is not a known ID of an index.
/// - `WRONG_INDEX_ALGO` if the index is not a range-compatible index.
/// - `BSATN_DECODE_ERROR`, when `prefix` cannot be decoded to
- /// a `prefix_elems` number of `AlgebraicValue`
- /// typed at the initial `prefix_elems` `AlgebraicType`s of the index's key type.
- /// Or when `rstart` or `rend` cannot be decoded to an `Bound`
- /// where the inner `AlgebraicValue`s are
- /// typed at the `prefix_elems + 1` `AlgebraicType` of the index's key type.
+ /// a `prefix_elems` number of `AlgebraicValue`
+ /// typed at the initial `prefix_elems` `AlgebraicType`s of the index's key type.
+ /// Or when `rstart` or `rend` cannot be decoded to an `Bound`
+ /// where the inner `AlgebraicValue`s are
+ /// typed at the `prefix_elems + 1` `AlgebraicType` of the index's key type.
pub fn datastore_delete_by_index_scan_range_bsatn(
index_id: IndexId,
prefix_ptr: *const u8,
@@ -364,7 +364,7 @@ pub mod raw {
/// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
/// - `NO_SUCH_TABLE`, when `table_id` is not a known ID of a table.
/// - `BSATN_DECODE_ERROR`, when `row` cannot be decoded to a `ProductValue`.
- /// typed at the `ProductType` the table's schema specifies.
+ /// typed at the `ProductType` the table's schema specifies.
/// - `UNIQUE_ALREADY_EXISTS`, when inserting `row` would violate a unique constraint.
/// - `SCHEDULE_AT_DELAY_TOO_LONG`, when the delay specified in the row was too long.
pub fn datastore_insert_bsatn(table_id: TableId, row_ptr: *mut u8, row_len_ptr: *mut usize) -> u16;
@@ -406,8 +406,8 @@ pub mod raw {
/// - `INDEX_NOT_UNIQUE`, when the index was not unique.
/// - `NO_SUCH_ROW`, when the row was not found in the unique index.
/// - `BSATN_DECODE_ERROR`, when `row` cannot be decoded to a `ProductValue`
- /// typed at the `ProductType` the table's schema specifies
- /// or when it cannot be projected to the index identified by `index_id`.
+ /// typed at the `ProductType` the table's schema specifies
+ /// or when it cannot be projected to the index identified by `index_id`.
/// - `UNIQUE_ALREADY_EXISTS`, when inserting `row` would violate a unique constraint.
/// - `SCHEDULE_AT_DELAY_TOO_LONG`, when the delay specified in the row was too long.
pub fn datastore_update_bsatn(
@@ -942,11 +942,11 @@ pub fn datastore_table_scan_bsatn(table_id: TableId) -> Result {
/// - `NO_SUCH_INDEX`, when `index_id` is not a known ID of an index.
/// - `WRONG_INDEX_ALGO` if the index is not a range-compatible index.
/// - `BSATN_DECODE_ERROR`, when `prefix` cannot be decoded to
-/// a `prefix_elems` number of `AlgebraicValue`
-/// typed at the initial `prefix_elems` `AlgebraicType`s of the index's key type.
-/// Or when `rstart` or `rend` cannot be decoded to an `Bound`
-/// where the inner `AlgebraicValue`s are
-/// typed at the `prefix_elems + 1` `AlgebraicType` of the index's key type.
+/// a `prefix_elems` number of `AlgebraicValue`
+/// typed at the initial `prefix_elems` `AlgebraicType`s of the index's key type.
+/// Or when `rstart` or `rend` cannot be decoded to an `Bound`
+/// where the inner `AlgebraicValue`s are
+/// typed at the `prefix_elems + 1` `AlgebraicType` of the index's key type.
pub fn datastore_index_scan_range_bsatn(
index_id: IndexId,
prefix: &[u8],
@@ -990,11 +990,11 @@ pub fn datastore_index_scan_range_bsatn(
/// - `NO_SUCH_INDEX`, when `index_id` is not a known ID of an index.
/// - `WRONG_INDEX_ALGO` if the index is not a range-compatible index.
/// - `BSATN_DECODE_ERROR`, when `prefix` cannot be decoded to
-/// a `prefix_elems` number of `AlgebraicValue`
-/// typed at the initial `prefix_elems` `AlgebraicType`s of the index's key type.
-/// Or when `rstart` or `rend` cannot be decoded to an `Bound`
-/// where the inner `AlgebraicValue`s are
-/// typed at the `prefix_elems + 1` `AlgebraicType` of the index's key type.
+/// a `prefix_elems` number of `AlgebraicValue`
+/// typed at the initial `prefix_elems` `AlgebraicType`s of the index's key type.
+/// Or when `rstart` or `rend` cannot be decoded to an `Bound`
+/// where the inner `AlgebraicValue`s are
+/// typed at the `prefix_elems + 1` `AlgebraicType` of the index's key type.
pub fn datastore_delete_by_index_scan_range_bsatn(
index_id: IndexId,
prefix: &[u8],
diff --git a/crates/bindings/Cargo.toml b/crates/bindings/Cargo.toml
index 19cea313fac..8e75cc3fce3 100644
--- a/crates/bindings/Cargo.toml
+++ b/crates/bindings/Cargo.toml
@@ -39,3 +39,6 @@ getrandom02 = { workspace = true, optional = true, features = ["custom"] }
[dev-dependencies]
insta.workspace = true
trybuild.workspace = true
+
+[lints]
+workspace = true
diff --git a/crates/bindings/src/lib.rs b/crates/bindings/src/lib.rs
index 7729985acb1..4bd241ee732 100644
--- a/crates/bindings/src/lib.rs
+++ b/crates/bindings/src/lib.rs
@@ -810,7 +810,7 @@ const DEFAULT_BUFFER_CAPACITY: usize = spacetimedb_primitives::ROW_ITER_CHUNK_SI
#[doc(hidden)]
pub fn table_id_from_name(table_name: &str) -> TableId {
sys::table_id_from_name(table_name).unwrap_or_else(|_| {
- panic!("Failed to get table with name: {}", table_name);
+ panic!("Failed to get table with name: {table_name}");
})
}
diff --git a/crates/bindings/tests/deps.rs b/crates/bindings/tests/deps.rs
index b070f3ba372..2383fdc7a1e 100644
--- a/crates/bindings/tests/deps.rs
+++ b/crates/bindings/tests/deps.rs
@@ -2,45 +2,12 @@
//! to make sure we don't unknowingly add a bunch of dependencies here,
//! slowing down compilation for every spacetime module.
-// We need to remove the `cpufeatures` and `libc` dependencies from the output, it added on `arm` architecture:
-// https://github.com/RustCrypto/sponges/blob/master/keccak/Cargo.toml#L24-L25, breaking local testing.
-#[cfg(target_arch = "aarch64")]
-fn hack_keccack(cmd: String) -> String {
- let mut found = false;
- let mut lines = cmd.lines().peekable();
- let mut output = String::new();
-
- while let Some(line) = lines.next() {
- // Check we only match keccak/cpufeatures/libc
- if line.contains("keccak") {
- found = true;
- }
- if found && line.contains("cpufeatures") {
- if let Some(next_line) = lines.peek() {
- if next_line.contains("libc") {
- found = false;
- // Skip libc
- lines.next();
- continue;
- }
- }
- }
- output.push_str(line);
- output.push('\n');
- }
-
- output
-}
-#[cfg(not(target_arch = "aarch64"))]
-fn hack_keccack(cmd: String) -> String {
- cmd
-}
-
#[test]
fn deptree_snapshot() -> std::io::Result<()> {
- let cmd = "cargo tree -p spacetimedb -f {lib} -e no-dev";
- let deps_tree = hack_keccack(run_cmd(cmd));
- let all_deps = hack_keccack(run_cmd("cargo tree -p spacetimedb -e no-dev --prefix none --no-dedupe"));
+ let cmd_common = "cargo tree -p spacetimedb -e no-dev --color never --target wasm32-unknown-unknown";
+ let cmd = &format!("{cmd_common} -f {{lib}}");
+ let deps_tree = run_cmd(cmd);
+ let all_deps = run_cmd(&format!("{cmd_common} --prefix none --no-dedupe"));
let mut all_deps = all_deps.lines().collect::>();
all_deps.sort();
all_deps.dedup();
@@ -52,7 +19,7 @@ fn deptree_snapshot() -> std::io::Result<()> {
cmd
);
- let cmd = "cargo tree -p spacetimedb -e no-dev -d --depth 0";
+ let cmd = &format!("{cmd_common} -d --depth 0");
insta::assert_snapshot!("duplicate_deps", run_cmd(cmd), cmd);
Ok(())
diff --git a/crates/bindings/tests/snapshots/deps__spacetimedb_bindings_dependencies.snap b/crates/bindings/tests/snapshots/deps__spacetimedb_bindings_dependencies.snap
index 6c97bcc8708..6c1e5485c8e 100644
--- a/crates/bindings/tests/snapshots/deps__spacetimedb_bindings_dependencies.snap
+++ b/crates/bindings/tests/snapshots/deps__spacetimedb_bindings_dependencies.snap
@@ -1,8 +1,8 @@
---
source: crates/bindings/tests/deps.rs
-expression: "cargo tree -p spacetimedb -f {lib} -e no-dev"
+expression: "cargo tree -p spacetimedb -e no-dev --color never --target wasm32-unknown-unknown -f {lib}"
---
-total crates: 61
+total crates: 60
spacetimedb
├── bytemuck
├── derive_more
@@ -19,11 +19,9 @@ spacetimedb
│ └── rustc_version
│ └── semver
├── getrandom
-│ ├── cfg_if
-│ └── libc
+│ └── cfg_if
├── log
├── rand
-│ ├── libc
│ ├── rand_chacha
│ │ ├── ppv_lite86
│ │ │ └── zerocopy
@@ -44,7 +42,12 @@ spacetimedb
│ │ └── nohash_hasher
│ └── syn (*)
├── spacetimedb_bindings_sys
-│ └── spacetimedb_primitives (*)
+│ └── spacetimedb_primitives
+│ ├── bitflags
+│ ├── either
+│ ├── itertools
+│ │ └── either
+│ └── nohash_hasher
├── spacetimedb_lib
│ ├── anyhow
│ ├── bitflags
diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml
index 6f9d290206f..1ec691324ad 100644
--- a/crates/cli/Cargo.toml
+++ b/crates/cli/Cargo.toml
@@ -81,3 +81,6 @@ tikv-jemalloc-ctl = { workspace = true }
[target.'cfg(windows)'.dependencies]
windows-sys = { workspace = true, features = ["Win32_System_Console"] }
+
+[lints]
+workspace = true
diff --git a/crates/cli/build.rs b/crates/cli/build.rs
index f9fae8159a4..71e37105571 100644
--- a/crates/cli/build.rs
+++ b/crates/cli/build.rs
@@ -4,5 +4,5 @@ use std::process::Command;
fn main() {
let output = Command::new("git").args(["rev-parse", "HEAD"]).output().unwrap();
let git_hash = String::from_utf8(output.stdout).unwrap();
- println!("cargo:rustc-env=GIT_HASH={}", git_hash);
+ println!("cargo:rustc-env=GIT_HASH={git_hash}");
}
diff --git a/crates/cli/src/config.rs b/crates/cli/src/config.rs
index 1e709b892f6..233a1461dfc 100644
--- a/crates/cli/src/config.rs
+++ b/crates/cli/src/config.rs
@@ -395,7 +395,7 @@ Fetch the server's fingerprint with:
let cfg = self.find_server_mut(server)?;
let old_nickname = if let Some(new_nickname) = new_nickname {
- std::mem::replace(&mut cfg.nickname, Some(new_nickname.to_string()))
+ cfg.nickname.replace(new_nickname.to_string())
} else {
None
};
diff --git a/crates/cli/src/subcommands/call.rs b/crates/cli/src/subcommands/call.rs
index c32a3944c41..23b351766c5 100644
--- a/crates/cli/src/subcommands/call.rs
+++ b/crates/cli/src/subcommands/call.rs
@@ -16,10 +16,7 @@ use super::sql::parse_req;
pub fn cli() -> clap::Command {
clap::Command::new("call")
- .about(format!(
- "Invokes a reducer function in a database. {}",
- UNSTABLE_WARNING
- ))
+ .about(format!("Invokes a reducer function in a database. {UNSTABLE_WARNING}"))
.arg(
Arg::new("database")
.required(true)
@@ -38,7 +35,7 @@ pub fn cli() -> clap::Command {
}
pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), Error> {
- eprintln!("{}\n", UNSTABLE_WARNING);
+ eprintln!("{UNSTABLE_WARNING}\n");
let reducer_name = args.get_one::("reducer_name").unwrap();
let arguments = args.get_many::("arguments");
@@ -60,7 +57,7 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), Error> {
.zip(&*reducer_def.params.elements)
.map(|(argument, element)| match &element.algebraic_type {
AlgebraicType::String if !argument.starts_with('\"') || !argument.ends_with('\"') => {
- format!("\"{}\"", argument)
+ format!("\"{argument}\"")
}
_ => argument.to_string(),
});
@@ -74,7 +71,7 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), Error> {
bail!(e);
};
- let error = Err(e).context(format!("Response text: {}", response_text));
+ let error = Err(e).context(format!("Response text: {response_text}"));
let error_msg = if response_text.starts_with("no such reducer") {
no_such_reducer(&database_identity, database, reducer_name, &module_def)
@@ -106,8 +103,7 @@ fn invalid_arguments(
if let Some((actual, expected)) = find_actual_expected(text).filter(|(a, e)| a != e) {
write!(
error,
- "\n\n{} parameters were expected, but {} were provided.",
- expected, actual
+ "\n\n{expected} parameters were expected, but {actual} were provided."
)
.unwrap();
}
@@ -170,10 +166,8 @@ impl std::fmt::Display for ReducerSignature<'_> {
/// Returns an error message for when `reducer` does not exist in `db`.
fn no_such_reducer(database_identity: &Identity, db: &str, reducer: &str, module_def: &ModuleDef) -> String {
- let mut error = format!(
- "No such reducer `{}` for database `{}` resolving to identity `{}`.",
- reducer, db, database_identity
- );
+ let mut error =
+ format!("No such reducer `{reducer}` for database `{db}` resolving to identity `{database_identity}`.");
add_reducer_ctx_to_err(&mut error, module_def, reducer);
@@ -192,7 +186,7 @@ fn add_reducer_ctx_to_err(error: &mut String, module_def: &ModuleDef, reducer_na
.collect::>();
if let Some(best) = find_best_match_for_name(&reducers, reducer_name, None) {
- write!(error, "\n\nA reducer with a similar name exists: `{}`", best).unwrap();
+ write!(error, "\n\nA reducer with a similar name exists: `{best}`").unwrap();
} else if reducers.is_empty() {
write!(error, "\n\nThe database has no reducers.").unwrap();
} else {
@@ -207,13 +201,13 @@ fn add_reducer_ctx_to_err(error: &mut String, module_def: &ModuleDef, reducer_na
// List them.
write!(error, "\n\nHere are some existing reducers:").unwrap();
for candidate in reducers {
- write!(error, "\n- {}", candidate).unwrap();
+ write!(error, "\n- {candidate}").unwrap();
}
// When some where not listed, note that are more.
if too_many_to_show {
let plural = if diff == 1 { "" } else { "s" };
- write!(error, "\n... ({} reducer{} not shown)", diff, plural).unwrap();
+ write!(error, "\n... ({diff} reducer{plural} not shown)").unwrap();
}
}
}
diff --git a/crates/cli/src/subcommands/describe.rs b/crates/cli/src/subcommands/describe.rs
index 6f1759ff791..e271e636262 100644
--- a/crates/cli/src/subcommands/describe.rs
+++ b/crates/cli/src/subcommands/describe.rs
@@ -10,8 +10,7 @@ use spacetimedb_lib::sats;
pub fn cli() -> clap::Command {
clap::Command::new("describe")
.about(format!(
- "Describe the structure of a database or entities within it. {}",
- UNSTABLE_WARNING
+ "Describe the structure of a database or entities within it. {UNSTABLE_WARNING}"
))
.arg(
Arg::new("database")
@@ -53,7 +52,7 @@ enum EntityType {
}
pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error> {
- eprintln!("{}\n", UNSTABLE_WARNING);
+ eprintln!("{UNSTABLE_WARNING}\n");
let entity_name = args.get_one::("entity_name");
let entity_type = args.get_one::("entity_type");
diff --git a/crates/cli/src/subcommands/dns.rs b/crates/cli/src/subcommands/dns.rs
index ade5605f132..59e049b038b 100644
--- a/crates/cli/src/subcommands/dns.rs
+++ b/crates/cli/src/subcommands/dns.rs
@@ -50,19 +50,17 @@ pub async fn exec(mut config: Config, args: &ArgMatches) -> Result<(), anyhow::E
if !status.is_success() {
anyhow::bail!(match result {
SetDomainsResult::Success => "".to_string(),
- SetDomainsResult::PermissionDenied { domain } => format!("Permission denied for domain: {}", domain),
+ SetDomainsResult::PermissionDenied { domain } => format!("Permission denied for domain: {domain}"),
SetDomainsResult::PermissionDeniedOnAny { domains } =>
- format!("Permission denied for domains: {:?}", domains),
- SetDomainsResult::DatabaseNotFound => format!("Database {} not found", database_identity),
- SetDomainsResult::NotYourDatabase { .. } => format!(
- "You cannot rename {} because it is owned by another identity.",
- database_identity
- ),
+ format!("Permission denied for domains: {domains:?}"),
+ SetDomainsResult::DatabaseNotFound => format!("Database {database_identity} not found"),
+ SetDomainsResult::NotYourDatabase { .. } =>
+ format!("You cannot rename {database_identity} because it is owned by another identity."),
SetDomainsResult::OtherError(err) => err,
});
}
- println!("Name set to {} for identity {}.", domain, database_identity);
+ println!("Name set to {domain} for identity {database_identity}.");
Ok(())
}
diff --git a/crates/cli/src/subcommands/energy.rs b/crates/cli/src/subcommands/energy.rs
index 0d4249e2320..f4c5b7b8aef 100644
--- a/crates/cli/src/subcommands/energy.rs
+++ b/crates/cli/src/subcommands/energy.rs
@@ -8,8 +8,7 @@ use crate::util::{self, get_login_token_or_log_in, UNSTABLE_WARNING};
pub fn cli() -> clap::Command {
clap::Command::new("energy")
.about(format!(
- "Invokes commands related to database budgets. {}",
- UNSTABLE_WARNING
+ "Invokes commands related to database budgets. {UNSTABLE_WARNING}"
))
.args_conflicts_with_subcommands(true)
.subcommand_required(true)
@@ -42,7 +41,7 @@ async fn exec_subcommand(config: Config, cmd: &str, args: &ArgMatches) -> Result
pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error> {
let (cmd, subcommand_args) = args.subcommand().expect("Subcommand required");
- eprintln!("{}\n", UNSTABLE_WARNING);
+ eprintln!("{UNSTABLE_WARNING}\n");
exec_subcommand(config, cmd, subcommand_args).await
}
@@ -67,7 +66,7 @@ async fn exec_status(mut config: Config, args: &ArgMatches) -> Result<(), anyhow
.text()
.await?;
- println!("{}", status);
+ println!("{status}");
Ok(())
}
diff --git a/crates/cli/src/subcommands/init.rs b/crates/cli/src/subcommands/init.rs
index 22cedafd68c..8a087b81b9a 100644
--- a/crates/cli/src/subcommands/init.rs
+++ b/crates/cli/src/subcommands/init.rs
@@ -8,7 +8,7 @@ use std::path::{Path, PathBuf};
pub fn cli() -> clap::Command {
clap::Command::new("init")
- .about(format!("Initializes a new spacetime project. {}", UNSTABLE_WARNING))
+ .about(format!("Initializes a new spacetime project. {UNSTABLE_WARNING}"))
.arg(
Arg::new("project-path")
.value_parser(clap::value_parser!(PathBuf))
@@ -40,7 +40,7 @@ fn check_for_cargo() -> bool {
println!("{}", "Warning: You have created a rust project, but you are missing `cargo`. Visit https://www.rust-lang.org/tools/install for installation instructions:\n\n\tYou have created a rust project, but you are missing cargo.\n".yellow());
}
unsupported_os => {
- println!("{}", format!("This OS may be unsupported: {}", unsupported_os).yellow());
+ println!("{}", format!("This OS may be unsupported: {unsupported_os}").yellow());
}
}
false
@@ -107,14 +107,14 @@ fn check_for_git() -> bool {
println!("{}", "Warning: You are missing git. You should install git from here:\n\n\thttps://git-scm.com/download/win\n".yellow());
}
unsupported_os => {
- println!("{}", format!("This OS may be unsupported: {}", unsupported_os).yellow());
+ println!("{}", format!("This OS may be unsupported: {unsupported_os}").yellow());
}
}
false
}
pub async fn exec(_config: Config, args: &ArgMatches) -> Result<(), anyhow::Error> {
- eprintln!("{}\n", UNSTABLE_WARNING);
+ eprintln!("{UNSTABLE_WARNING}\n");
let project_path = args.get_one::("project-path").unwrap();
let project_lang = *args.get_one::("lang").unwrap();
diff --git a/crates/cli/src/subcommands/list.rs b/crates/cli/src/subcommands/list.rs
index 8080af3c790..d650e98f52d 100644
--- a/crates/cli/src/subcommands/list.rs
+++ b/crates/cli/src/subcommands/list.rs
@@ -16,8 +16,7 @@ use tabled::{
pub fn cli() -> Command {
Command::new("list")
.about(format!(
- "Lists the databases attached to an identity. {}",
- UNSTABLE_WARNING
+ "Lists the databases attached to an identity. {UNSTABLE_WARNING}"
))
.arg(common_args::server().help("The nickname, host name or URL of the server from which to list databases"))
.arg(common_args::yes())
@@ -35,7 +34,7 @@ struct IdentityRow {
}
pub async fn exec(mut config: Config, args: &ArgMatches) -> Result<(), anyhow::Error> {
- eprintln!("{}\n", UNSTABLE_WARNING);
+ eprintln!("{UNSTABLE_WARNING}\n");
let server = args.get_one::("server").map(|s| s.as_ref());
let force = args.get_flag("force");
@@ -63,10 +62,10 @@ pub async fn exec(mut config: Config, args: &ArgMatches) -> Result<(), anyhow::E
table
.with(Style::psql())
.with(Modify::new(Columns::first()).with(Alignment::left()));
- println!("Associated database identities for {}:\n", identity);
- println!("{}", table);
+ println!("Associated database identities for {identity}:\n");
+ println!("{table}");
} else {
- println!("No databases found for {}.", identity);
+ println!("No databases found for {identity}.");
}
Ok(())
diff --git a/crates/cli/src/subcommands/login.rs b/crates/cli/src/subcommands/login.rs
index 8e2568b478a..a05afe33072 100644
--- a/crates/cli/src/subcommands/login.rs
+++ b/crates/cli/src/subcommands/login.rs
@@ -89,10 +89,10 @@ async fn exec_show(config: Config, args: &ArgMatches) -> Result<(), anyhow::Erro
};
let identity = decode_identity(token)?;
- println!("You are logged in as {}", identity);
+ println!("You are logged in as {identity}");
if include_token {
- println!("Your auth token (don't share this!) is {}", token);
+ println!("Your auth token (don't share this!) is {token}");
}
Ok(())
@@ -213,7 +213,7 @@ async fn web_login(remote: &Url) -> Result {
browser_url
.query_pairs_mut()
.append_pair("token", web_login_request_token);
- println!("Opening {} in your browser.", browser_url);
+ println!("Opening {browser_url} in your browser.");
if webbrowser::open(browser_url.as_str()).is_err() {
println!("Unable to open your browser! Please open the URL above manually.");
}
@@ -251,7 +251,7 @@ async fn spacetimedb_login(remote: &Url, web_session_token: &String) -> Result
Result<(), anyhow::E
let client = reqwest::Client::new();
client
.post(host.join("auth/cli/logout")?)
- .header("Authorization", format!("Bearer {}", web_session_token))
+ .header("Authorization", format!("Bearer {web_session_token}"))
.send()
.await?;
}
diff --git a/crates/cli/src/subcommands/logs.rs b/crates/cli/src/subcommands/logs.rs
index cdfa2651b2a..f517ec6317a 100644
--- a/crates/cli/src/subcommands/logs.rs
+++ b/crates/cli/src/subcommands/logs.rs
@@ -130,7 +130,7 @@ pub async fn exec(mut config: Config, args: &ArgMatches) -> Result<(), anyhow::E
let host_url = config.get_host_url(server)?;
- let builder = reqwest::Client::new().get(format!("{}/v1/database/{}/logs", host_url, database_identity));
+ let builder = reqwest::Client::new().get(format!("{host_url}/v1/database/{database_identity}/logs"));
let builder = add_auth_header_opt(builder, &auth_header);
let mut res = builder.query(&query_params).send().await?;
let status = res.status();
@@ -156,10 +156,7 @@ pub async fn exec(mut config: Config, args: &ArgMatches) -> Result<(), anyhow::E
let out = termcolor::StandardStream::stdout(term_color);
let mut out = out.lock();
- let mut rdr = res
- .bytes_stream()
- .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
- .into_async_read();
+ let mut rdr = res.bytes_stream().map_err(io::Error::other).into_async_read();
let mut line = String::new();
while rdr.read_line(&mut line).await? != 0 {
let record = serde_json::from_str::>(&line)?;
diff --git a/crates/cli/src/subcommands/publish.rs b/crates/cli/src/subcommands/publish.rs
index 7aff429affa..df63923eb59 100644
--- a/crates/cli/src/subcommands/publish.rs
+++ b/crates/cli/src/subcommands/publish.rs
@@ -128,7 +128,7 @@ pub async fn exec(mut config: Config, args: &ArgMatches) -> Result<(), anyhow::E
url.host_str().unwrap_or("").to_string()
};
if server_address != "localhost" && server_address != "127.0.0.1" {
- println!("You are about to publish to a non-local server: {}", server_address);
+ println!("You are about to publish to a non-local server: {server_address}");
if !y_or_n(force, "Are you sure you want to proceed?")? {
println!("Aborting");
return Ok(());
@@ -194,9 +194,9 @@ pub async fn exec(mut config: Config, args: &ArgMatches) -> Result<(), anyhow::E
PublishOp::Updated => "Updated",
};
if let Some(domain) = domain {
- println!("{} database with name: {}, identity: {}", op, domain, database_identity);
+ println!("{op} database with name: {domain}, identity: {database_identity}");
} else {
- println!("{} database with identity: {}", op, database_identity);
+ println!("{op} database with identity: {database_identity}");
}
}
PublishResult::PermissionDenied { name } => {
diff --git a/crates/cli/src/subcommands/server.rs b/crates/cli/src/subcommands/server.rs
index 611919a6385..19e36f32f68 100644
--- a/crates/cli/src/subcommands/server.rs
+++ b/crates/cli/src/subcommands/server.rs
@@ -17,8 +17,7 @@ pub fn cli() -> Command {
.subcommand_required(true)
.subcommands(get_subcommands())
.about(format!(
- "Manage the connection to the SpacetimeDB server. {}",
- UNSTABLE_WARNING
+ "Manage the connection to the SpacetimeDB server. {UNSTABLE_WARNING}"
))
}
@@ -116,7 +115,7 @@ fn get_subcommands() -> Vec {
pub async fn exec(config: Config, paths: &SpacetimePaths, args: &ArgMatches) -> Result<(), anyhow::Error> {
let (cmd, subcommand_args) = args.subcommand().expect("Subcommand required");
- eprintln!("{}\n", UNSTABLE_WARNING);
+ eprintln!("{UNSTABLE_WARNING}\n");
exec_subcommand(config, paths, cmd, subcommand_args).await
}
@@ -168,7 +167,7 @@ pub async fn exec_list(config: Config, _args: &ArgMatches) -> Result<(), anyhow:
table
.with(Style::empty())
.with(Modify::new(Columns::first()).with(Alignment::right()));
- println!("{}", table);
+ println!("{table}");
Ok(())
}
@@ -212,7 +211,7 @@ Add a server without retrieving its fingerprint with:
\tspacetime server add --url {url} --no-fingerprint",
)
})?;
- println!("For server {}, got fingerprint:\n{}", url, fingerprint);
+ println!("For server {url}, got fingerprint:\n{fingerprint}");
Some(fingerprint)
};
@@ -222,8 +221,8 @@ Add a server without retrieving its fingerprint with:
config.set_default_server(host)?;
}
- println!("Host: {}", host);
- println!("Protocol: {}", protocol);
+ println!("Host: {host}");
+ println!("Protocol: {protocol}");
config.save();
@@ -248,24 +247,18 @@ async fn update_server_fingerprint(config: &mut Config, server: Option<&str>) ->
.context("Error fetching server fingerprint")?;
if let Some(saved_fing) = config.server_fingerprint(server)? {
if saved_fing == new_fing {
- println!("Fingerprint is unchanged for server {}:\n{}", nick_or_host, saved_fing);
+ println!("Fingerprint is unchanged for server {nick_or_host}:\n{saved_fing}");
Ok(false)
} else {
- println!(
- "Fingerprint has changed for server {}.\nWas:\n{}\nNew:\n{}",
- nick_or_host, saved_fing, new_fing
- );
+ println!("Fingerprint has changed for server {nick_or_host}.\nWas:\n{saved_fing}\nNew:\n{new_fing}");
config.set_server_fingerprint(server, new_fing)?;
Ok(true)
}
} else {
- println!(
- "No saved fingerprint for server {}. New fingerprint:\n{}",
- nick_or_host, new_fing
- );
+ println!("No saved fingerprint for server {nick_or_host}. New fingerprint:\n{new_fing}");
config.set_server_fingerprint(server, new_fing)?;
@@ -292,18 +285,18 @@ pub async fn exec_ping(config: Config, args: &ArgMatches) -> Result<(), anyhow::
let server = args.get_one::("server").unwrap().as_str();
let url = config.get_host_url(Some(server))?;
- let builder = reqwest::Client::new().get(format!("{}/v1/ping", url).as_str());
+ let builder = reqwest::Client::new().get(format!("{url}/v1/ping").as_str());
let response = builder.send().await?;
match response.status() {
reqwest::StatusCode::OK => {
- println!("Server is online: {}", url);
+ println!("Server is online: {url}");
}
reqwest::StatusCode::NOT_FOUND => {
- println!("Server returned 404 (Not Found): {}", url);
+ println!("Server returned 404 (Not Found): {url}");
}
err => {
- println!("Server could not be reached ({}): {}", err, url);
+ println!("Server could not be reached ({err}): {url}");
}
}
Ok(())
@@ -336,13 +329,13 @@ pub async fn exec_edit(mut config: Config, args: &ArgMatches) -> Result<(), anyh
let server = new_nick.unwrap_or(server);
if let (Some(new_nick), Some(old_nick)) = (new_nick, old_nick) {
- println!("Changing nickname from {} to {}", old_nick, new_nick);
+ println!("Changing nickname from {old_nick} to {new_nick}");
}
if let (Some(new_host), Some(old_host)) = (new_host, old_host) {
- println!("Changing host from {} to {}", old_host, new_host);
+ println!("Changing host from {old_host} to {new_host}");
}
if let (Some(new_proto), Some(old_proto)) = (new_proto, old_proto) {
- println!("Changing protocol from {} to {}", old_proto, new_proto);
+ println!("Changing protocol from {old_proto} to {new_proto}");
}
let new_url = config.get_host_url(Some(server))?;
diff --git a/crates/cli/src/subcommands/sql.rs b/crates/cli/src/subcommands/sql.rs
index 8e6cac174db..5a79190a45b 100644
--- a/crates/cli/src/subcommands/sql.rs
+++ b/crates/cli/src/subcommands/sql.rs
@@ -14,7 +14,7 @@ use spacetimedb_lib::sats::{satn, ProductType, ProductValue, Typespace};
pub fn cli() -> clap::Command {
clap::Command::new("sql")
- .about(format!("Runs a SQL query on the database. {}", UNSTABLE_WARNING))
+ .about(format!("Runs a SQL query on the database. {UNSTABLE_WARNING}"))
.arg(
Arg::new("database")
.required(true)
@@ -129,7 +129,7 @@ fn print_stmt_result(
if let Some(with_stats) = with_stats {
f.write_char('\n')?;
- f.write_str(&format!("Roundtrip time: {:.2?}", with_stats))?;
+ f.write_str(&format!("Roundtrip time: {with_stats:.2?}"))?;
f.write_char('\n')?;
}
Ok(())
@@ -151,7 +151,7 @@ pub(crate) async fn run_sql(builder: RequestBuilder, sql: &str, with_stats: bool
let mut out = String::new();
print_stmt_result(&stmt_result_json, with_stats.then_some(now.elapsed()), &mut out)?;
- println!("{}", out);
+ println!("{out}");
Ok(())
}
@@ -170,7 +170,7 @@ fn stmt_result_to_table(stmt_result: &SqlStmtResult) -> anyhow::Result<(StmtStat
}
pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error> {
- eprintln!("{}\n", UNSTABLE_WARNING);
+ eprintln!("{UNSTABLE_WARNING}\n");
let interactive = args.get_one::("interactive").unwrap_or(&false);
if *interactive {
let con = parse_req(config, args).await?;
diff --git a/crates/cli/src/subcommands/subscribe.rs b/crates/cli/src/subcommands/subscribe.rs
index 6eb2039c787..5f61908ebfa 100644
--- a/crates/cli/src/subcommands/subscribe.rs
+++ b/crates/cli/src/subcommands/subscribe.rs
@@ -22,10 +22,7 @@ use crate::Config;
pub fn cli() -> clap::Command {
clap::Command::new("subscribe")
- .about(format!(
- "Subscribe to SQL queries on the database. {}",
- UNSTABLE_WARNING
- ))
+ .about(format!("Subscribe to SQL queries on the database. {UNSTABLE_WARNING}"))
.arg(
Arg::new("database")
.required(true)
@@ -125,7 +122,7 @@ struct SubscriptionTable {
}
pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error> {
- eprintln!("{}\n", UNSTABLE_WARNING);
+ eprintln!("{UNSTABLE_WARNING}\n");
let queries = args.get_many::("query").unwrap();
let num = args.get_one::("num-updates").copied();
diff --git a/crates/cli/src/util.rs b/crates/cli/src/util.rs
index 7eb9e8bee43..7161d4c9fec 100644
--- a/crates/cli/src/util.rs
+++ b/crates/cli/src/util.rs
@@ -124,7 +124,7 @@ pub async fn spacetime_dns(
}
pub async fn spacetime_server_fingerprint(url: &str) -> anyhow::Result {
- let builder = reqwest::Client::new().get(format!("{}/v1/identity/public-key", url).as_str());
+ let builder = reqwest::Client::new().get(format!("{url}/v1/identity/public-key").as_str());
let res = builder.send().await?.error_for_status()?;
let fingerprint = res.text().await?;
Ok(fingerprint)
@@ -261,7 +261,7 @@ pub fn y_or_n(force: bool, prompt: &str) -> anyhow::Result {
return Ok(true);
}
let mut input = String::new();
- print!("{} [y/N]", prompt);
+ print!("{prompt} [y/N]");
std::io::stdout().flush()?;
std::io::stdin().read_line(&mut input)?;
let input = input.trim().to_lowercase();
diff --git a/crates/client-api-messages/Cargo.toml b/crates/client-api-messages/Cargo.toml
index 90f767ee364..83c5b2ade97 100644
--- a/crates/client-api-messages/Cargo.toml
+++ b/crates/client-api-messages/Cargo.toml
@@ -4,6 +4,7 @@ version.workspace = true
edition.workspace = true
license-file = "LICENSE"
description = "Types for the SpacetimeDB client API messages"
+rust-version.workspace = true
[dependencies]
spacetimedb-lib = { workspace = true, features = ["serde"] }
@@ -29,3 +30,6 @@ hex.workspace = true
itertools.workspace = true
proptest.workspace = true
serde_json.workspace = true
+
+[lints]
+workspace = true
diff --git a/crates/client-api/Cargo.toml b/crates/client-api/Cargo.toml
index bd30f31da8d..f19b1495dd8 100644
--- a/crates/client-api/Cargo.toml
+++ b/crates/client-api/Cargo.toml
@@ -4,11 +4,13 @@ version.workspace = true
edition.workspace = true
license-file = "LICENSE"
description = "The HTTP API for SpacetimeDB"
+rust-version.workspace = true
[dependencies]
spacetimedb-client-api-messages.workspace = true
spacetimedb-core.workspace = true
spacetimedb-data-structures.workspace = true
+spacetimedb-datastore.workspace = true
spacetimedb-lib = { workspace = true, features = ["serde"] }
spacetimedb-paths.workspace = true
spacetimedb-schema.workspace = true
@@ -48,9 +50,16 @@ uuid.workspace = true
jsonwebtoken.workspace = true
scopeguard.workspace = true
serde_with.workspace = true
+async-stream.workspace = true
+humantime.workspace = true
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemalloc_pprof.workspace = true
[dev-dependencies]
jsonwebtoken.workspace = true
+pretty_assertions = { workspace = true, features = ["unstable"] }
+toml.workspace = true
+
+[lints]
+workspace = true
diff --git a/crates/client-api/src/auth.rs b/crates/client-api/src/auth.rs
index 7437e6ef8cf..61031625867 100644
--- a/crates/client-api/src/auth.rs
+++ b/crates/client-api/src/auth.rs
@@ -310,12 +310,12 @@ impl IntoResponse for AuthorizationRejection {
// Sensible fallback if no auth header is present.
const REQUIRED: (StatusCode, &str) = (StatusCode::UNAUTHORIZED, "Authorization required");
- log::trace!("Authorization rejection: {:?}", self);
+ log::trace!("Authorization rejection: {self:?}");
match self {
AuthorizationRejection::Jwt(e) if *e.kind() == JwtErrorKind::InvalidSignature => ROTATED.into_response(),
AuthorizationRejection::Jwt(_) | AuthorizationRejection::Header(_) => INVALID.into_response(),
- AuthorizationRejection::Custom(msg) => (StatusCode::UNAUTHORIZED, format!("{:?}", msg)).into_response(),
+ AuthorizationRejection::Custom(msg) => (StatusCode::UNAUTHORIZED, format!("{msg:?}")).into_response(),
AuthorizationRejection::Required => REQUIRED.into_response(),
}
}
diff --git a/crates/client-api/src/lib.rs b/crates/client-api/src/lib.rs
index 37b120ed17c..ac82a3e142d 100644
--- a/crates/client-api/src/lib.rs
+++ b/crates/client-api/src/lib.rs
@@ -98,7 +98,7 @@ impl Host {
&mut header,
)
.map_err(|e| {
- log::warn!("{}", e);
+ log::warn!("{e}");
if let Some(auth_err) = e.get_auth_error() {
(StatusCode::UNAUTHORIZED, auth_err.to_string())
} else {
diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs
index 00f67783952..91a67f9c406 100644
--- a/crates/client-api/src/routes/database.rs
+++ b/crates/client-api/src/routes/database.rs
@@ -31,7 +31,7 @@ use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9;
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_lib::{sats, Timestamp};
-use super::subscribe::handle_websocket;
+use super::subscribe::{handle_websocket, HasWebSocketOptions};
#[derive(Deserialize)]
pub struct CallParams {
@@ -120,16 +120,16 @@ pub async fn call(
}
ReducerCallError::NoSuchModule(_) | ReducerCallError::ScheduleReducerNotFound => StatusCode::NOT_FOUND,
ReducerCallError::NoSuchReducer => {
- log::debug!("Attempt to call non-existent reducer {}", reducer);
+ log::debug!("Attempt to call non-existent reducer {reducer}");
StatusCode::NOT_FOUND
}
ReducerCallError::LifecycleReducer(lifecycle) => {
- log::debug!("Attempt to call {lifecycle:?} lifecycle reducer {}", reducer);
+ log::debug!("Attempt to call {lifecycle:?} lifecycle reducer {reducer}");
StatusCode::BAD_REQUEST
}
};
- log::debug!("Error while invoking reducer {:#}", e);
+ log::debug!("Error while invoking reducer {e:#}");
Err((status_code, format!("{:#}", anyhow::anyhow!(e))))
}
};
@@ -164,11 +164,7 @@ fn reducer_outcome_response(identity: &Identity, reducer: &str, outcome: Reducer
(StatusCode::from_u16(530).unwrap(), errmsg)
}
ReducerOutcome::BudgetExceeded => {
- log::warn!(
- "Node's energy budget exceeded for identity: {} while executing {}",
- identity,
- reducer
- );
+ log::warn!("Node's energy budget exceeded for identity: {identity} while executing {reducer}");
(
StatusCode::PAYMENT_REQUIRED,
"Module energy budget exhausted.".to_owned(),
@@ -263,7 +259,7 @@ pub async fn db_info(
State(worker_ctx): State,
Path(DatabaseParam { name_or_identity }): Path,
) -> axum::response::Result {
- log::trace!("Trying to resolve database identity: {:?}", name_or_identity);
+ log::trace!("Trying to resolve database identity: {name_or_identity:?}");
let database_identity = name_or_identity.resolve(&worker_ctx).await?;
log::trace!("Resolved identity to: {database_identity:?}");
let database = worker_ctx_find_database(&worker_ctx, &database_identity)
@@ -794,7 +790,7 @@ pub struct DatabaseRoutes {
impl Default for DatabaseRoutes
where
- S: NodeDelegate + ControlStateDelegate + Clone + 'static,
+ S: NodeDelegate + ControlStateDelegate + HasWebSocketOptions + Clone + 'static,
{
fn default() -> Self {
use axum::routing::{delete, get, post, put};
diff --git a/crates/client-api/src/routes/energy.rs b/crates/client-api/src/routes/energy.rs
index 15c22560d9e..16e66963cf6 100644
--- a/crates/client-api/src/routes/energy.rs
+++ b/crates/client-api/src/routes/energy.rs
@@ -97,7 +97,7 @@ pub async fn set_energy_balance(
.map(|balance| balance.parse::())
.transpose()
.map_err(|err| {
- log::error!("Failed to parse balance: {:?}", err);
+ log::error!("Failed to parse balance: {err:?}");
StatusCode::BAD_REQUEST
})?
.unwrap_or(0);
diff --git a/crates/client-api/src/routes/identity.rs b/crates/client-api/src/routes/identity.rs
index e423267a8d4..69b27661fbe 100644
--- a/crates/client-api/src/routes/identity.rs
+++ b/crates/client-api/src/routes/identity.rs
@@ -79,7 +79,7 @@ pub async fn get_databases(
let identity = identity.into();
// Linear scan for all databases that have this owner, and return their identities
let all_dbs = ctx.get_databases().map_err(|e| {
- log::error!("Failure when retrieving databases for search: {}", e);
+ log::error!("Failure when retrieving databases for search: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let identities = all_dbs
diff --git a/crates/client-api/src/routes/internal.rs b/crates/client-api/src/routes/internal.rs
index c5819e72e23..b9faf10d391 100644
--- a/crates/client-api/src/routes/internal.rs
+++ b/crates/client-api/src/routes/internal.rs
@@ -91,7 +91,7 @@ mod jemalloc_profiling {
prof_ctl.activate().map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
- format!("Failed to activate heap profiling: {}", e),
+ format!("Failed to activate heap profiling: {e}"),
)
})?;
Ok(("Heap profiling activated").into_response())
@@ -99,7 +99,7 @@ mod jemalloc_profiling {
prof_ctl.deactivate().map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
- format!("Failed to deactivate heap profiling: {}", e),
+ format!("Failed to deactivate heap profiling: {e}"),
)
})?;
Ok(("Heap profiling deactivated").into_response())
diff --git a/crates/client-api/src/routes/metrics.rs b/crates/client-api/src/routes/metrics.rs
index 15737be587a..b946e8a0b9a 100644
--- a/crates/client-api/src/routes/metrics.rs
+++ b/crates/client-api/src/routes/metrics.rs
@@ -20,7 +20,7 @@ pub async fn metrics(State(ctx): State) -> axum::response::R
let mut encode_to_buffer = |mfs: &[_]| {
if let Err(e) = prometheus::TextEncoder.encode_utf8(mfs, &mut buf) {
- log::error!("could not encode custom metrics: {}", e);
+ log::error!("could not encode custom metrics: {e}");
}
};
diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs
index 85bd8acb42d..b1e94e1ea9a 100644
--- a/crates/client-api/src/routes/subscribe.rs
+++ b/crates/client-api/src/routes/subscribe.rs
@@ -1,37 +1,52 @@
-use std::mem;
+use std::fmt::Display;
+use std::future::{poll_fn, Future};
+use std::num::NonZeroUsize;
+use std::panic;
use std::pin::{pin, Pin};
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::task::{Context, Poll};
use std::time::Duration;
+use async_stream::stream;
use axum::extract::{Path, Query, State};
use axum::response::IntoResponse;
use axum::Extension;
use axum_extra::TypedHeader;
use bytes::Bytes;
use bytestring::ByteString;
-use futures::future::MaybeDone;
-use futures::{Future, FutureExt, SinkExt, StreamExt};
+use derive_more::From;
+use futures::{pin_mut, Sink, SinkExt, Stream, StreamExt};
use http::{HeaderValue, StatusCode};
-use scopeguard::ScopeGuard;
+use prometheus::IntGauge;
+use scopeguard::{defer, ScopeGuard};
use serde::Deserialize;
-use spacetimedb::client::messages::{serialize, IdentityTokenMessage, SerializableMessage, SerializeBuffer};
+use spacetimedb::client::messages::{
+ serialize, IdentityTokenMessage, SerializableMessage, SerializeBuffer, SwitchedServerMessage, ToProtocol,
+};
use spacetimedb::client::{
- ClientActorId, ClientConfig, ClientConnection, DataMessage, MessageHandleError, MeteredDeque, MeteredReceiver,
- Protocol,
+ ClientActorId, ClientConfig, ClientConnection, DataMessage, MessageExecutionError, MessageHandleError,
+ MeteredReceiver, Protocol,
};
-use spacetimedb::execution_context::WorkloadType;
use spacetimedb::host::module_host::ClientConnectedError;
use spacetimedb::host::NoSuchModule;
-use spacetimedb::util::also_poll;
+use spacetimedb::util::spawn_rayon;
use spacetimedb::worker_metrics::WORKER_METRICS;
use spacetimedb::Identity;
use spacetimedb_client_api_messages::websocket::{self as ws_api, Compression};
+use spacetimedb_datastore::execution_context::WorkloadType;
use spacetimedb_lib::connection_id::{ConnectionId, ConnectionIdForUrl};
use std::time::Instant;
+use tokio::sync::{mpsc, watch};
+use tokio::task::JoinHandle;
+use tokio::time::error::Elapsed;
+use tokio::time::{sleep_until, timeout};
use tokio_tungstenite::tungstenite::Utf8Bytes;
use crate::auth::SpacetimeAuth;
+use crate::util::serde::humantime_duration;
use crate::util::websocket::{
- CloseCode, CloseFrame, Message as WsMessage, WebSocketConfig, WebSocketStream, WebSocketUpgrade,
+ CloseCode, CloseFrame, Message as WsMessage, WebSocketConfig, WebSocketStream, WebSocketUpgrade, WsError,
};
use crate::util::{NameOrIdentity, XForwardedFor};
use crate::{log_and_500, ControlStateDelegate, NodeDelegate};
@@ -41,6 +56,16 @@ pub const TEXT_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_api::TEXT_PRO
#[allow(clippy::declare_interior_mutable_const)]
pub const BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_api::BIN_PROTOCOL);
+pub trait HasWebSocketOptions {
+ fn websocket_options(&self) -> WebSocketOptions;
+}
+
+impl HasWebSocketOptions for Arc {
+ fn websocket_options(&self) -> WebSocketOptions {
+ (**self).websocket_options()
+ }
+}
+
#[derive(Deserialize)]
pub struct SubscribeParams {
pub name_or_identity: NameOrIdentity,
@@ -74,7 +99,7 @@ pub async fn handle_websocket(
ws: WebSocketUpgrade,
) -> axum::response::Result
where
- S: NodeDelegate + ControlStateDelegate,
+ S: NodeDelegate + ControlStateDelegate + HasWebSocketOptions,
{
if connection_id.is_some() {
// TODO: Bump this up to `log::warn!` after removing the client SDKs' uses of that parameter.
@@ -132,24 +157,25 @@ where
.max_message_size(Some(0x2000000))
.max_frame_size(None)
.accept_unmasked_frames(false);
+ let ws_opts = ctx.websocket_options();
tokio::spawn(async move {
let ws = match ws_upgrade.upgrade(ws_config).await {
Ok(ws) => ws,
Err(err) => {
- log::error!("WebSocket init error: {}", err);
+ log::error!("WebSocket init error: {err}");
return;
}
};
match forwarded_for {
Some(TypedHeader(XForwardedFor(ip))) => {
- log::debug!("New client connected from ip {}", ip)
+ log::debug!("New client connected from ip {ip}")
}
None => log::debug!("New client connected from unknown ip"),
}
- let actor = |client, sendrx| ws_client_actor(client, ws, sendrx);
+ let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx);
let client = match ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor).await
{
Ok(s) => s,
@@ -181,356 +207,920 @@ where
Ok(res)
}
-const LIVELINESS_TIMEOUT: Duration = Duration::from_secs(60);
-const SEND_TIMEOUT: Duration = Duration::from_secs(5);
+struct ActorState {
+ pub client_id: ClientActorId,
+ pub database: Identity,
+ config: WebSocketOptions,
+ closed: AtomicBool,
+ got_pong: AtomicBool,
+}
+
+impl ActorState {
+ pub fn new(database: Identity, client_id: ClientActorId, config: WebSocketOptions) -> Self {
+ Self {
+ database,
+ client_id,
+ config,
+ closed: AtomicBool::new(false),
+ got_pong: AtomicBool::new(true),
+ }
+ }
+
+ pub fn closed(&self) -> bool {
+ self.closed.load(Ordering::Relaxed)
+ }
+
+ pub fn close(&self) -> bool {
+ self.closed.swap(true, Ordering::Relaxed)
+ }
-async fn ws_client_actor(client: ClientConnection, ws: WebSocketStream, sendrx: MeteredReceiver) {
+ pub fn set_ponged(&self) {
+ self.got_pong.store(true, Ordering::Relaxed);
+ }
+
+ pub fn reset_ponged(&self) -> bool {
+ self.got_pong.swap(false, Ordering::Relaxed)
+ }
+
+ pub fn next_idle_deadline(&self) -> Instant {
+ Instant::now() + self.config.idle_timeout
+ }
+}
+
+/// Configuration for WebSocket connections.
+#[derive(Clone, Copy, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
+#[serde(rename_all = "kebab-case")]
+pub struct WebSocketOptions {
+ /// Interval at which to send `Ping` frames.
+ ///
+ /// We use pings for connection keep-alive.
+ /// Value must be smaller than `idle_timeout`.
+ ///
+ /// Default: 15s
+ #[serde(with = "humantime_duration")]
+ #[serde(default = "WebSocketOptions::default_ping_interval")]
+ pub ping_interval: Duration,
+ /// Amount of time after which an idle connection is closed.
+ ///
+ /// A connection is considered idle if no data is received nor sent.
+ /// This includes `Ping`/`Pong` frames used for keep-alive.
+ ///
+ /// Value must be greater than `ping_interval`.
+ ///
+ /// Default: 30s
+ #[serde(with = "humantime_duration")]
+ #[serde(default = "WebSocketOptions::default_idle_timeout")]
+ pub idle_timeout: Duration,
+ /// For how long to keep draining the incoming messages until a client close
+ /// is received.
+ ///
+ /// Default: 250ms
+ #[serde(with = "humantime_duration")]
+ #[serde(default = "WebSocketOptions::default_close_handshake_timeout")]
+ pub close_handshake_timeout: Duration,
+ /// Maximum number of messages to queue for processing.
+ ///
+ /// If this number is exceeded, the client is disconnected.
+ ///
+ /// Default: 2048
+ #[serde(default = "WebSocketOptions::default_incoming_queue_length")]
+ pub incoming_queue_length: NonZeroUsize,
+}
+
+impl Default for WebSocketOptions {
+ fn default() -> Self {
+ Self::DEFAULT
+ }
+}
+
+impl WebSocketOptions {
+ const DEFAULT_PING_INTERVAL: Duration = Duration::from_secs(15);
+ const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
+ const DEFAULT_CLOSE_HANDSHAKE_TIMEOUT: Duration = Duration::from_millis(250);
+ const DEFAULT_INCOMING_QUEUE_LENGTH: NonZeroUsize = NonZeroUsize::new(2048).expect("2048 > 0, qed");
+
+ const DEFAULT: Self = Self {
+ ping_interval: Self::DEFAULT_PING_INTERVAL,
+ idle_timeout: Self::DEFAULT_IDLE_TIMEOUT,
+ close_handshake_timeout: Self::DEFAULT_CLOSE_HANDSHAKE_TIMEOUT,
+ incoming_queue_length: Self::DEFAULT_INCOMING_QUEUE_LENGTH,
+ };
+
+ const fn default_ping_interval() -> Duration {
+ Self::DEFAULT_PING_INTERVAL
+ }
+
+ const fn default_idle_timeout() -> Duration {
+ Self::DEFAULT_IDLE_TIMEOUT
+ }
+
+ const fn default_close_handshake_timeout() -> Duration {
+ Self::DEFAULT_CLOSE_HANDSHAKE_TIMEOUT
+ }
+
+ const fn default_incoming_queue_length() -> NonZeroUsize {
+ Self::DEFAULT_INCOMING_QUEUE_LENGTH
+ }
+}
+
+async fn ws_client_actor(
+ options: WebSocketOptions,
+ client: ClientConnection,
+ ws: WebSocketStream,
+ sendrx: MeteredReceiver,
+) {
// ensure that even if this task gets cancelled, we always cleanup the connection
let mut client = scopeguard::guard(client, |client| {
tokio::spawn(client.disconnect());
});
- ws_client_actor_inner(&mut client, ws, sendrx).await;
+ ws_client_actor_inner(&mut client, options, ws, sendrx).await;
ScopeGuard::into_inner(client).disconnect().await;
}
-async fn make_progress(fut: &mut Pin<&mut MaybeDone>) {
- if let MaybeDone::Gone = **fut {
- // nothing to do
- } else {
- fut.await
- }
-}
-
async fn ws_client_actor_inner(
client: &mut ClientConnection,
- mut ws: WebSocketStream,
- mut sendrx: MeteredReceiver,
+ config: WebSocketOptions,
+ ws: WebSocketStream,
+ sendrx: MeteredReceiver,
) {
- let mut liveness_check_interval = tokio::time::interval(LIVELINESS_TIMEOUT);
- let mut got_pong = true;
+ let database = client.module.info().database_identity;
+ let client_id = client.id;
+ let client_closed_metric = WORKER_METRICS.ws_clients_closed_connection.with_label_values(&database);
+ let state = Arc::new(ActorState::new(database, client_id, config));
- let addr = client.module.info().database_identity;
+ // Channel for [`UnorderedWsMessage`]s.
+ let (unordered_tx, unordered_rx) = mpsc::unbounded_channel();
- // Build a queue of incoming messages to handle, to be processed one at a time,
- // in the order they're received.
- //
- // N.B. if you're refactoring this code: you must ensure the handle_queue is dropped before
- // client.disconnect() is called. Otherwise, we can be left with a stale future that's never
- // awaited, which can lead to bugs like:
- // https://rust-lang.github.io/wg-async/vision/submitted_stories/status_quo/aws_engineer/solving_a_deadlock.html
- //
- // NOTE: never let this go unpolled while you're awaiting something; otherwise, it's possible
- // to deadlock or delay for a long time. see usage of `also_poll()` in the branches of the
- // `select!` for examples of how to do this.
- //
- // TODO: do we want this to have a fixed capacity? or should it be unbounded
- let mut message_queue = MeteredDeque::<(DataMessage, Instant)>::new(
- WORKER_METRICS.total_incoming_queue_length.with_label_values(&addr),
- );
- let mut current_message = pin!(MaybeDone::Gone);
+ // Split websocket into send and receive halves.
+ let (ws_send, ws_recv) = ws.split();
- let mut closed = false;
- let mut rx_buf = Vec::new();
+ // Set up the idle timer.
+ let (idle_tx, idle_rx) = watch::channel(state.next_idle_deadline());
+ let idle_timer = ws_idle_timer(idle_rx);
- let mut msg_buffer = SerializeBuffer::new(client.config);
- loop {
- rx_buf.clear();
- enum Item {
- Message(ClientMessage),
- HandleResult(Result<(), MessageHandleError>),
- }
- if let MaybeDone::Gone = *current_message {
- if let Some((message, timer)) = message_queue.pop_front() {
+ // Spawn a task to send outgoing messages
+ // obtained from `sendrx` and `unordered_rx`.
+ let send_task = tokio::spawn(ws_send_loop(
+ state.clone(),
+ client.config,
+ ws_send,
+ sendrx,
+ unordered_rx,
+ ));
+ // Spawn a task to handle incoming messages.
+ let recv_task = tokio::spawn(ws_recv_task(
+ state.clone(),
+ idle_tx,
+ client_closed_metric,
+ {
+ let client = client.clone();
+ move |data, timer| {
let client = client.clone();
- let fut = async move { client.handle_message(message, timer).await };
- current_message.set(MaybeDone::Future(fut));
+ async move { client.handle_message(data, timer).await }
}
+ },
+ unordered_tx.clone(),
+ ws_recv,
+ ));
+ let hotswap = {
+ let client = client.clone();
+ move || {
+ let mut client = client.clone();
+ async move { client.watch_module_host().await }
}
- let message = tokio::select! {
- // NOTE: all of the futures for these branches **must** be cancel safe. do not
- // change this if you don't know what that means.
-
- // If we have a result from handling a past message to report,
- // grab it to handle in the next `match`.
- Some(res) = async {
- make_progress(&mut current_message).await;
- current_message.as_mut().take_output()
- } => {
- Item::HandleResult(res)
- }
+ };
- // If we've received an incoming message,
- // grab it to handle in the next `match`.
- message = ws.next() => match message {
- Some(Ok(m)) => Item::Message(ClientMessage::from_message(m)),
- Some(Err(error)) => {
- log::warn!("Websocket receive error: {}", error);
- break;
+ ws_main_loop(state, hotswap, idle_timer, send_task, recv_task, move |msg| {
+ let _ = unordered_tx.send(msg);
+ })
+ .await;
+ log::info!("Client connection ended: {client_id}");
+}
+
+/// The main `select!` loop of the websocket client actor.
+///
+/// > This function is defined standalone with generic parameters so that its
+/// > behavior can be tested in isolation, not requiring I/O and allowing to
+/// > mock effects easily.
+///
+/// The loop's responsibilities are:
+///
+/// - Drive the tasks handling the send and receive ends of the websockets to
+/// completion, terminating when either of them completes.
+///
+/// - Terminating if the connection is idle for longer than [`ActorConfig::idle_timeout`].
+/// The connection becomes idle if nothing is received from the socket.
+///
+/// - Periodically sending `Ping` frames to prevent the connection from becoming
+/// idle (the client is supposed to respond with `Pong`, which resets the
+/// idle timer). See [`ActorConfig::ping_interval`].
+///
+/// - Watch for changes to the [`ClientConnection`]'s module reference.
+/// If it changes, the [`ClientConnection`] "hotswaps" the module, if it
+/// is exited, the loop schedules a `Close` frame to be sent, initiating a
+/// connection shutdown.
+///
+/// A peculiarity of handling termination is the websocket [close handshake]:
+/// whichever side wants to close the connection sends a `Close` frame and needs
+/// to wait for the other end to respond with a `Close` for the connection to
+/// end cleanly.
+///
+/// `tungstenite` handles the protocol details of the close handshake for us,
+/// but for it to work properly, we must keep polling the socket until the
+/// handshake is complete.
+///
+/// This is straightforward when the client initiates the close, as the receive
+/// stream will just become exhausted, and we'll exit the loop.
+///
+/// In the case of a server-initiated close, it's a bit more tricky, as we're
+/// not supposed to send any more data after a `Close` frame (and `tungstenite`
+/// prevents it). Yet, we need to keep polling the receive end until either
+/// the `Close` response (which could be queued behind a large number of
+/// outstanding messages) arrives, or a timeout elapses (in case the client
+/// never responds).
+///
+/// The implementations [`ws_recv_loop`] and [`ws_send_loop`] thus share the
+/// [`ActorState`], which tracks whether the connection is in the closing phase
+/// ([`ActorState::closed()`]). If closed, both the send and receive loops keep
+/// running, but drop any incoming or outgoing messages respectively until
+/// either the `Close` response arrives or [`ActorConfig::close_handshake_timeout`]
+/// elapses.
+///
+///
+/// Parameters:
+///
+/// * **state**:
+/// The shared [`ActorState`], updated here when a `Pong` message is received.
+///
+/// * **hotswap**:
+/// An abstraction for [`ClientConnection::watch_module_host`], which updates
+/// the connection's internal reference to the module if it was updated,
+/// allowing database updates without disconnecting clients.
+///
+/// It is polled here for its error return value: if the output of the future
+/// is `Err(NoSuchModule)`, the database was shut down and existing clients
+/// must be disconnected.
+///
+/// * **idle_timer**:
+/// Abstraction for [`ws_idle_timer`]: if and when the future completes, the
+/// connection is considered unresponsive, and the connection is closed.
+///
+/// The idle timer should be reset whenever data is received from the websocket.
+///
+/// * **send_task**:
+/// Task handling outgoing messages. Holds the receive end of `unordered_tx`.
+///
+/// If the task returns, the connection is considered bad, and the main loop
+/// exits. If the task panicked, the panic is resumed on the current thread.
+///
+/// Note that the send task must not terminate after it has sent a `Close`
+/// frame (via `unordered_tx`) -- the websocket protocol mandates that the
+/// initiator of the close handshake wait for the other end to respond with
+/// a `Close` frame. Thus, the loop must continue to poll `recv_task` and not
+/// exit due to `send_task` being complete.
+///
+/// See [`ws_send_loop`].
+///
+/// * **recv_task**:
+/// Task handling incoming messages.
+///
+/// If the task returns, the connection is considered closed, and the main
+/// loop exits. If the task panicked, the panic is resumed on the current
+/// thread.
+///
+/// See [`ws_recv_task`].
+///
+/// * **unordered_tx**:
+/// Channel connected to `send_task` that allows the loop to send `Ping` and
+/// `Close` frames.
+///
+/// Note that messages sent while the receiving `send_task` is already
+/// terminated are silently ignored. This is safe because the loop will exit
+/// anyway when the `send_task` is complete.
+///
+///
+/// [close handshake]: https://datatracker.ietf.org/doc/html/rfc6455#section-7
+async fn ws_main_loop(
+ state: Arc,
+ hotswap: impl Fn() -> HotswapWatcher,
+ idle_timer: impl Future