Skip to content

Commit 56cfbd6

Browse files
Experiment/dependencies graph (#364)
* Simplest startup job to initialize dependencies graph. * Removed dependency query from DatasetRepository. * Integrated dependencies graph into GraphQL queries for upstream/downstream links. * Integrated dependencies graph into dataset deletion. * Reacting on `DatasetCreated` events. * Implemented reaction of dependencies graph on changes in dataset inputs * Implemented lazy vs eager dependencies initialization
1 parent f798407 commit 56cfbd6

File tree

54 files changed

+1199
-555
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1199
-555
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/adapter/auth-oso/tests/tests/test_oso_dataset_authorizer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::sync::Arc;
1414
use dill::Component;
1515
use event_bus::EventBus;
1616
use kamu::testing::MetadataFactory;
17-
use kamu::DatasetRepositoryLocalFs;
17+
use kamu::{DatasetRepositoryLocalFs, DependencyGraphServiceInMemory};
1818
use kamu_adapter_auth_oso::{KamuAuthOso, OsoDatasetAuthorizer};
1919
use kamu_core::auth::{DatasetAction, DatasetActionAuthorizer, DatasetActionUnauthorizedError};
2020
use kamu_core::{AccessError, CurrentAccountSubject, DatasetRepository};
@@ -111,6 +111,7 @@ impl DatasetAuthorizerHarness {
111111
)))
112112
.add::<KamuAuthOso>()
113113
.add::<OsoDatasetAuthorizer>()
114+
.add::<DependencyGraphServiceInMemory>()
114115
.add_builder(
115116
DatasetRepositoryLocalFs::builder()
116117
.with_root(datasets_dir)

src/adapter/graphql/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ kamu-data-utils = { workspace = true }
2525
kamu-core = { workspace = true }
2626
kamu-task-system = { workspace = true }
2727
kamu-flow-system = { workspace = true }
28+
event-bus = { workspace = true }
2829

2930
async-graphql = { version = "6", features = ["chrono", "url", "apollo_tracing"] }
3031
async-trait = { version = "0.1", default-features = false }
@@ -36,6 +37,7 @@ indoc = "2"
3637
serde = "1"
3738
serde_json = "1"
3839
tokio = { version = "1", default-features = false, features = [] }
40+
tokio-stream = { version = "0.1", default-features = false }
3941
tracing = "0.1"
4042
thiserror = { version = "1", default-features = false }
4143

src/adapter/graphql/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#![feature(error_generic_member_access)]
1111
#![feature(error_in_core)]
1212
#![feature(int_roundings)]
13+
#![feature(async_closure)]
1314

1415
pub mod extensions;
1516
pub(crate) mod mutations;

src/adapter/graphql/src/queries/datasets/dataset_metadata.rs

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
// by the Apache License, Version 2.0.
99

1010
use chrono::prelude::*;
11-
use futures::TryStreamExt;
1211
use kamu_core::{self as domain, MetadataChainExt, TryStreamExtExt};
1312
use opendatafabric as odf;
1413
use opendatafabric::{AsTypedBlock, VariantOf};
@@ -93,39 +92,59 @@ impl DatasetMetadata {
9392

9493
/// Current upstream dependencies of a dataset
9594
async fn current_upstream_dependencies(&self, ctx: &Context<'_>) -> Result<Vec<Dataset>> {
96-
let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
95+
let dependency_graph_service =
96+
from_catalog::<dyn domain::DependencyGraphService>(ctx).unwrap();
9797

98-
let dataset = self.get_dataset(ctx).await?;
99-
let summary = dataset
100-
.get_summary(domain::GetSummaryOpts::default())
98+
use tokio_stream::StreamExt;
99+
let upstream_dataset_ids: Vec<_> = dependency_graph_service
100+
.get_upstream_dependencies(&self.dataset_handle.id)
101101
.await
102-
.int_err()?;
102+
.int_err()?
103+
.collect()
104+
.await;
103105

104-
let mut dependencies: Vec<_> = Vec::new();
105-
for input in summary.dependencies.into_iter() {
106-
let dataset_id = input.id.unwrap().clone();
107-
let dataset_handle = dataset_repo
108-
.resolve_dataset_ref(&dataset_id.as_local_ref())
106+
let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
107+
let mut upstream = Vec::with_capacity(upstream_dataset_ids.len());
108+
for upstream_dataset_id in upstream_dataset_ids {
109+
let hdl = dataset_repo
110+
.resolve_dataset_ref(&upstream_dataset_id.as_local_ref())
109111
.await
110112
.int_err()?;
111-
dependencies.push(Dataset::new(
112-
Account::from_dataset_alias(ctx, &dataset_handle.alias),
113-
dataset_handle,
113+
upstream.push(Dataset::new(
114+
Account::from_dataset_alias(ctx, &hdl.alias),
115+
hdl,
114116
));
115117
}
116-
Ok(dependencies)
118+
119+
Ok(upstream)
117120
}
118121

119122
// TODO: Convert to collection
120123
/// Current downstream dependencies of a dataset
121124
async fn current_downstream_dependencies(&self, ctx: &Context<'_>) -> Result<Vec<Dataset>> {
122-
let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
125+
let dependency_graph_service =
126+
from_catalog::<dyn domain::DependencyGraphService>(ctx).unwrap();
123127

124-
let downstream: Vec<_> = dataset_repo
125-
.get_downstream_dependencies(&self.dataset_handle.as_local_ref())
126-
.map_ok(|hdl| Dataset::new(Account::from_dataset_alias(ctx, &hdl.alias), hdl))
127-
.try_collect()
128-
.await?;
128+
use tokio_stream::StreamExt;
129+
let downstream_dataset_ids: Vec<_> = dependency_graph_service
130+
.get_downstream_dependencies(&self.dataset_handle.id)
131+
.await
132+
.int_err()?
133+
.collect()
134+
.await;
135+
136+
let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
137+
let mut downstream = Vec::with_capacity(downstream_dataset_ids.len());
138+
for downstream_dataset_id in downstream_dataset_ids {
139+
let hdl = dataset_repo
140+
.resolve_dataset_ref(&downstream_dataset_id.as_local_ref())
141+
.await
142+
.int_err()?;
143+
downstream.push(Dataset::new(
144+
Account::from_dataset_alias(ctx, &hdl.alias),
145+
hdl,
146+
));
147+
}
129148

130149
Ok(downstream)
131150
}

src/adapter/graphql/tests/tests/test_error_handling.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ async fn test_internal_error() {
6161
.add::<EventBus>()
6262
.add_value(CurrentAccountSubject::new_test())
6363
.add::<auth::AlwaysHappyDatasetActionAuthorizer>()
64+
.add::<DependencyGraphServiceInMemory>()
6465
.add_builder(
6566
DatasetRepositoryLocalFs::builder()
6667
.with_root(tempdir.path().join("datasets"))

src/adapter/graphql/tests/tests/test_gql_data.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use opendatafabric::*;
2525
async fn create_catalog_with_local_workspace(tempdir: &Path) -> dill::Catalog {
2626
dill::CatalogBuilder::new()
2727
.add::<EventBus>()
28+
.add::<DependencyGraphServiceInMemory>()
2829
.add_builder(
2930
DatasetRepositoryLocalFs::builder()
3031
.with_root(tempdir.join("datasets"))

src/adapter/graphql/tests/tests/test_gql_datasets.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ async fn dataset_rename_name_collision() {
377377
#[test_log::test(tokio::test)]
378378
async fn dataset_delete_success() {
379379
let harness = GraphQLDatasetsHarness::new();
380+
harness.init_dependencies_graph().await;
380381

381382
let foo_result = harness
382383
.create_root_dataset(DatasetName::new_unchecked("foo"))
@@ -426,6 +427,7 @@ async fn dataset_delete_success() {
426427
#[test_log::test(tokio::test)]
427428
async fn dataset_delete_dangling_ref() {
428429
let harness = GraphQLDatasetsHarness::new();
430+
harness.init_dependencies_graph().await;
429431

430432
let foo_result = harness
431433
.create_root_dataset(DatasetName::new_unchecked("foo"))
@@ -531,7 +533,7 @@ async fn dataset_view_permissions() {
531533

532534
struct GraphQLDatasetsHarness {
533535
_tempdir: tempfile::TempDir,
534-
_base_catalog: dill::Catalog,
536+
base_catalog: dill::Catalog,
535537
catalog_authorized: dill::Catalog,
536538
catalog_anonymous: dill::Catalog,
537539
}
@@ -544,6 +546,7 @@ impl GraphQLDatasetsHarness {
544546

545547
let base_catalog = dill::CatalogBuilder::new()
546548
.add::<EventBus>()
549+
.add::<DependencyGraphServiceInMemory>()
547550
.add_builder(
548551
DatasetRepositoryLocalFs::builder()
549552
.with_root(datasets_dir)
@@ -559,12 +562,27 @@ impl GraphQLDatasetsHarness {
559562

560563
Self {
561564
_tempdir: tempdir,
562-
_base_catalog: base_catalog,
565+
base_catalog,
563566
catalog_anonymous,
564567
catalog_authorized,
565568
}
566569
}
567570

571+
pub async fn init_dependencies_graph(&self) {
572+
let dataset_repo = self
573+
.catalog_authorized
574+
.get_one::<dyn DatasetRepository>()
575+
.unwrap();
576+
let dependency_graph_service = self
577+
.base_catalog
578+
.get_one::<dyn DependencyGraphService>()
579+
.unwrap();
580+
dependency_graph_service
581+
.eager_initialization(&DependencyGraphRepositoryInMemory::new(dataset_repo))
582+
.await
583+
.unwrap();
584+
}
585+
568586
pub async fn create_root_dataset(&self, name: DatasetName) -> CreateDatasetResult {
569587
let dataset_repo = self
570588
.catalog_authorized

src/adapter/graphql/tests/tests/test_gql_metadata_chain.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ async fn metadata_chain_append_event() {
2929

3030
let base_catalog = dill::CatalogBuilder::new()
3131
.add::<EventBus>()
32+
.add::<DependencyGraphServiceInMemory>()
3233
.add_builder(
3334
DatasetRepositoryLocalFs::builder()
3435
.with_root(tempdir.path().join("datasets"))
@@ -125,6 +126,7 @@ async fn metadata_update_readme_new() {
125126

126127
let base_catalog = dill::CatalogBuilder::new()
127128
.add::<EventBus>()
129+
.add::<DependencyGraphServiceInMemory>()
128130
.add_builder(
129131
DatasetRepositoryLocalFs::builder()
130132
.with_root(tempdir.path().join("datasets"))

src/adapter/graphql/tests/tests/test_gql_search.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ async fn query() {
2121

2222
let cat = dill::CatalogBuilder::new()
2323
.add::<EventBus>()
24+
.add::<DependencyGraphServiceInMemory>()
2425
.add_value(CurrentAccountSubject::new_test())
2526
.add::<auth::AlwaysHappyDatasetActionAuthorizer>()
2627
.add_builder(

0 commit comments

Comments
 (0)