diff --git a/crates/ark/src/lsp/diagnostics.rs b/crates/ark/src/lsp/diagnostics.rs index e602e6469..c6a00e77e 100644 --- a/crates/ark/src/lsp/diagnostics.rs +++ b/crates/ark/src/lsp/diagnostics.rs @@ -30,6 +30,7 @@ use crate::lsp::encoding::convert_tree_sitter_range_to_lsp_range; use crate::lsp::indexer; use crate::lsp::inputs::library::Library; use crate::lsp::inputs::package::Package; +use crate::lsp::inputs::source_root::SourceRoot; use crate::lsp::state::WorldState; use crate::lsp::traits::node::NodeExt; use crate::lsp::traits::rope::RopeExt; @@ -62,6 +63,9 @@ pub struct DiagnosticContext<'a> { // The set of packages that are currently installed. pub installed_packages: HashSet, + /// Reference to source root, if any. + pub root: &'a Option, + /// Reference to the library for looking up package exports. pub library: &'a Library, @@ -83,13 +87,14 @@ impl Default for DiagnosticsConfig { } impl<'a> DiagnosticContext<'a> { - pub fn new(contents: &'a Rope, library: &'a Library) -> Self { + pub fn new(contents: &'a Rope, root: &'a Option, library: &'a Library) -> Self { Self { contents, document_symbols: Vec::new(), session_symbols: HashSet::new(), workspace_symbols: HashSet::new(), installed_packages: HashSet::new(), + root, library, library_symbols: BTreeMap::new(), in_formula: false, @@ -130,7 +135,11 @@ impl<'a> DiagnosticContext<'a> { } } -pub(crate) fn generate_diagnostics(doc: Document, state: WorldState) -> Vec { +pub(crate) fn generate_diagnostics( + doc: Document, + state: WorldState, + testthat: bool, +) -> Vec { let mut diagnostics = Vec::new(); if !state.config.diagnostics.enable { @@ -144,7 +153,7 @@ pub(crate) fn generate_diagnostics(doc: Document, state: WorldState) -> Vec Vec { context.workspace_symbols.insert(name.to_string()); }, + indexer::IndexEntryData::Variable { name } => { + context.workspace_symbols.insert(name.to_string()); + }, _ => {}, }); + // If this is a package, add imported symbols to workspace + if let Some(SourceRoot::Package(root)) = &state.root { + // Add symbols from `importFrom()` directives + for import in &root.namespace.imports { + context.workspace_symbols.insert(import.clone()); + } + + // Add symbols from `import()` directives + for package_import in &root.namespace.package_imports { + if let Some(pkg) = state.library.get(package_import) { + for export in &pkg.namespace.exports { + context.workspace_symbols.insert(export.clone()); + } + } + } + } + + // Simple workaround to include testthat exports in test files. I think the + // general principle would be that (a) files in `tests/testthat/` include + // `testthat.R` as a preamble (note that people modify that file e.g. to add + // more `library()` calls), and (b) all helper files are included in a + // test-specific workspace (which is effectively the case currently as we + // don't special-case how workspace inclusion works for packages). We might + // want to provide a mechanism for test packages to declare this sort of + // test files setup. + if testthat { + if let Some(pkg) = state.library.get("testthat") { + for export in &pkg.namespace.exports { + context.workspace_symbols.insert(export.clone()); + } + } + } + // Add per-environment session symbols for scope in state.console_scopes.iter() { for name in scope.iter() { @@ -1097,10 +1142,10 @@ mod tests { use harp::eval::RParseEvalOptions; use once_cell::sync::Lazy; + use tower_lsp::lsp_types; use tower_lsp::lsp_types::Position; use crate::interface::console_inputs; - use crate::lsp::diagnostics::generate_diagnostics; use crate::lsp::documents::Document; use crate::lsp::inputs::library::Library; use crate::lsp::inputs::package::Package; @@ -1113,6 +1158,10 @@ mod tests { // Default state that includes installed packages and default scopes. static DEFAULT_STATE: Lazy = Lazy::new(|| current_state()); + fn generate_diagnostics(doc: Document, state: WorldState) -> Vec { + super::generate_diagnostics(doc, state, false) + } + fn current_state() -> WorldState { let inputs = console_inputs().unwrap(); diff --git a/crates/ark/src/lsp/diagnostics_syntax.rs b/crates/ark/src/lsp/diagnostics_syntax.rs index aa9bef605..277aff831 100644 --- a/crates/ark/src/lsp/diagnostics_syntax.rs +++ b/crates/ark/src/lsp/diagnostics_syntax.rs @@ -314,7 +314,7 @@ mod tests { fn text_diagnostics(text: &str) -> Vec { let document = Document::new(text, None); let library = Library::default(); - let context = DiagnosticContext::new(&document.contents, &library); + let context = DiagnosticContext::new(&document.contents, &None, &library); let diagnostics = syntax_diagnostics(document.ast.root_node(), &context).unwrap(); diagnostics } diff --git a/crates/ark/src/lsp/inputs/library.rs b/crates/ark/src/lsp/inputs/library.rs index 3dc2e7afe..9fd00903c 100644 --- a/crates/ark/src/lsp/inputs/library.rs +++ b/crates/ark/src/lsp/inputs/library.rs @@ -68,7 +68,7 @@ impl Library { fn load_package(&self, name: &str) -> anyhow::Result> { for lib_path in self.library_paths.iter() { - match Package::load(&lib_path, name)? { + match Package::load_from_library(&lib_path, name)? { Some(pkg) => return Ok(Some(pkg)), None => (), } diff --git a/crates/ark/src/lsp/inputs/package.rs b/crates/ark/src/lsp/inputs/package.rs index e24dd2001..e0bf19b4d 100644 --- a/crates/ark/src/lsp/inputs/package.rs +++ b/crates/ark/src/lsp/inputs/package.rs @@ -23,15 +23,12 @@ pub struct Package { } impl Package { - /// Attempts to load a package from the given path and name. - pub fn load(lib_path: &std::path::Path, name: &str) -> anyhow::Result> { - let package_path = lib_path.join(name); - + /// Load a package from a given path. + pub fn load_from_folder(package_path: &std::path::Path) -> anyhow::Result> { let description_path = package_path.join("DESCRIPTION"); let namespace_path = package_path.join("NAMESPACE"); - // Only consider libraries that have a folder named after the - // requested package and that contains a description file + // Only consider directories that contain a description file if !description_path.is_file() { return Ok(None); } @@ -41,24 +38,42 @@ impl Package { let description_contents = fs::read_to_string(&description_path)?; let description = Description::parse(&description_contents)?; - if description.name != name { - return Err(anyhow::anyhow!( - "`Package` field in `DESCRIPTION` doesn't match folder name '{name}'" - )); - } - let namespace = if namespace_path.is_file() { let namespace_contents = fs::read_to_string(&namespace_path)?; Namespace::parse(&namespace_contents)? } else { - tracing::info!("Package `{name}` doesn't contain a NAMESPACE file, using defaults"); + tracing::info!( + "Package `{name}` doesn't contain a NAMESPACE file, using defaults", + name = description.name + ); Namespace::default() }; Ok(Some(Package { - path: package_path, + path: package_path.to_path_buf(), description, namespace, })) } + + /// Load a package from the given library path and name. + pub fn load_from_library( + lib_path: &std::path::Path, + name: &str, + ) -> anyhow::Result> { + let package_path = lib_path.join(name); + + // For library packages, ensure the invariant that the package name + // matches the folder name + if let Some(pkg) = Self::load_from_folder(&package_path)? { + if pkg.description.name != name { + return Err(anyhow::anyhow!( + "`Package` field in `DESCRIPTION` doesn't match folder name '{name}'" + )); + } + Ok(Some(pkg)) + } else { + Ok(None) + } + } } diff --git a/crates/ark/src/lsp/main_loop.rs b/crates/ark/src/lsp/main_loop.rs index a2bd2c40d..4f6485936 100644 --- a/crates/ark/src/lsp/main_loop.rs +++ b/crates/ark/src/lsp/main_loop.rs @@ -7,15 +7,20 @@ use std::collections::HashMap; use std::future; +use std::path::Path; use std::path::PathBuf; use std::pin::Pin; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; +use std::sync::LazyLock; use std::sync::RwLock; use anyhow::anyhow; +use futures::stream::FuturesUnordered; use futures::StreamExt; +use tokio::sync::mpsc; use tokio::sync::mpsc::unbounded_channel as tokio_unbounded_channel; +use tokio::task; use tokio::task::JoinHandle; use tower_lsp::lsp_types; use tower_lsp::lsp_types::Diagnostic; @@ -30,9 +35,10 @@ use crate::lsp::backend::LspNotification; use crate::lsp::backend::LspRequest; use crate::lsp::backend::LspResponse; use crate::lsp::capabilities::Capabilities; -use crate::lsp::diagnostics; +use crate::lsp::diagnostics::generate_diagnostics; use crate::lsp::documents::Document; use crate::lsp::handlers; +use crate::lsp::indexer; use crate::lsp::inputs::library::Library; use crate::lsp::state::WorldState; use crate::lsp::state_handlers; @@ -641,27 +647,6 @@ where send_auxiliary(AuxiliaryEvent::SpawnedTask(handle)); } -pub(crate) fn spawn_diagnostics_refresh(uri: Url, document: Document, state: WorldState) { - lsp::spawn_blocking(move || { - let _s = tracing::info_span!("diagnostics_refresh", uri = %uri).entered(); - - let version = document.version; - let diagnostics = diagnostics::generate_diagnostics(document, state); - - Ok(Some(AuxiliaryEvent::PublishDiagnostics( - uri, - diagnostics, - version, - ))) - }) -} - -pub(crate) fn spawn_diagnostics_refresh_all(state: WorldState) { - for (url, document) in state.documents.iter() { - spawn_diagnostics_refresh(url.clone(), document.clone(), state.clone()) - } -} - pub(crate) fn publish_diagnostics(uri: Url, diagnostics: Vec, version: Option) { send_auxiliary(AuxiliaryEvent::PublishDiagnostics( uri, @@ -692,3 +677,206 @@ impl std::fmt::Debug for TraceKernelNotification<'_> { } } } + +#[derive(Debug)] +pub(crate) enum IndexerQueueTask { + Indexer(IndexerTask), + Diagnostics(RefreshDiagnosticsTask), +} + +#[derive(Debug)] +pub enum IndexerTask { + Start { folders: Vec }, + Update { document: Document, uri: Url }, +} + +#[derive(Debug)] +pub(crate) struct RefreshDiagnosticsTask { + uri: Url, + state: WorldState, +} + +#[derive(Debug)] +struct RefreshDiagnosticsResult { + uri: Url, + diagnostics: Vec, + version: Option, +} + +static INDEXER_QUEUE: LazyLock> = + LazyLock::new(|| { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + tokio::spawn(process_indexer_queue(rx)); + tx + }); + +/// Process indexer and diagnostics tasks +/// +/// Diagnostics need an up-to-date index to be accurate, so we synchronise +/// indexing and diagnostics tasks using a simple queue. +/// +/// - We make sure to refresh diagnostics after every indexer updates. +/// - Indexer tasks are batched together, same for diagnostics tasks. +/// - Cancellation is simply dealt with by deduplicating tasks for the same URI, +/// retaining only the most recent one. +/// +/// Ideally we'd process indexer tasks continually without making them dependent +/// on diagnostics tasks. The current setup blocks the queue loop while +/// diagnostics are running, but it has the benefit that rounds of diagnostic +/// refreshes don't race against each other. The frontend will receive all +/// results in order, ensuring that diagnostics for an outdated version are +/// eventually replaced by the most up-to-date diagnostics. +/// +/// Note that this setup will be entirely replaced in the future by Salsa +/// dependencies. Diagnostics refreshes will depend on indexer results in a +/// natural way and they will be cancelled automatically as document updates +/// arrive. +async fn process_indexer_queue(mut rx: mpsc::UnboundedReceiver) { + let mut diagnostics_batch = Vec::new(); + let mut indexer_batch = Vec::new(); + + while let Some(task) = rx.recv().await { + let mut tasks = vec![task]; + + // Process diagnostics at least every 10 iterations if indexer tasks + // keep coming in, so the user gets intermediate diagnostics refreshes + for _ in 0..10 { + while let Ok(task) = rx.try_recv() { + tasks.push(task); + } + + // Separate by type + for task in std::mem::take(&mut tasks) { + match task { + IndexerQueueTask::Indexer(indexer_task) => indexer_batch.push(indexer_task), + IndexerQueueTask::Diagnostics(diagnostic_task) => { + diagnostics_batch.push(diagnostic_task) + }, + } + } + + // No more indexer tasks, let's do diagnostics + if indexer_batch.is_empty() { + break; + } + + // Process indexer tasks first so diagnostics tasks work with an + // up-to-date index + process_indexer_batch(std::mem::take(&mut indexer_batch)).await; + } + + process_diagnostics_batch(std::mem::take(&mut diagnostics_batch)).await; + } +} + +async fn process_indexer_batch(batch: Vec) { + // Deduplicate tasks by key. We use a `HashMap` so only the last insertion + // is retained. `Update` tasks use URI as key, `Start` tasks use None (we + // only expect one though). This is effectively a way of cancelling `Update` + // tasks for outdated documents. + let batch: std::collections::HashMap<_, _> = batch + .into_iter() + .map(|task| match &task { + IndexerTask::Update { uri, .. } => (Some(uri.clone()), task), + IndexerTask::Start { .. } => (None, task), + }) + .collect(); + + let mut handles = Vec::new(); + + for (_, task) in batch { + handles.push(tokio::task::spawn_blocking(move || match task { + IndexerTask::Start { folders } => { + indexer::start(folders); + }, + IndexerTask::Update { document, uri } => { + let result = if let Ok(path) = uri.to_file_path() { + indexer::update(&document, &path) + } else { + Err(anyhow!("Failed to convert URI to file path: {uri}")) + }; + if let Err(err) = result { + log::error!("Indexer update failed: {err}"); + } + }, + })); + } + + for handle in handles { + let _ = handle.await; + } +} + +async fn process_diagnostics_batch(batch: Vec) { + // Deduplicate tasks by keeping only the last one for each URI. We use a + // `HashMap` so only the last insertion is retained. This is effectively a + // way of cancelling diagnostics tasks for outdated documents. + let batch: std::collections::HashMap<_, _> = batch + .into_iter() + .map(|task| (task.uri, task.state)) + .collect(); + + let mut futures = FuturesUnordered::new(); + + for (uri, state) in batch { + futures.push(task::spawn_blocking(move || { + let _span = tracing::info_span!("diagnostics_refresh", uri = %uri).entered(); + + if let Some(document) = state.documents.get(&uri) { + // Special case testthat-specific behaviour. This is a simple + // stopgap approach that has some false positives (e.g. when we + // work on testthat itself the flag will always be true), but + // that shouldn't have much practical impact. + let testthat = Path::new(uri.path()) + .components() + .any(|c| c.as_os_str() == "testthat"); + + let diagnostics = generate_diagnostics(document.clone(), state.clone(), testthat); + Some(RefreshDiagnosticsResult { + uri, + diagnostics, + version: document.version, + }) + } else { + None + } + })); + } + + // Publish results as they complete + while let Some(Ok(Some(result))) = futures.next().await { + publish_diagnostics(result.uri, result.diagnostics, result.version); + } +} + +pub(crate) fn index_start(folders: Vec, state: WorldState) { + INDEXER_QUEUE + .send(IndexerQueueTask::Indexer(IndexerTask::Start { folders })) + .unwrap_or_else(|err| lsp::log_error!("Failed to queue initial indexing: {err}")); + + diagnostics_refresh_all(state); +} + +pub(crate) fn index_update(uri: Url, document: Document, state: WorldState) { + INDEXER_QUEUE + .send(IndexerQueueTask::Indexer(IndexerTask::Update { + document, + uri: uri.clone(), + })) + .unwrap_or_else(|err| lsp::log_error!("Failed to queue index update: {err}")); + + // Refresh all diagnostics since the indexer results for one file may affect + // other files + diagnostics_refresh_all(state); +} + +pub(crate) fn diagnostics_refresh_all(state: WorldState) { + for (uri, _document) in state.documents.iter() { + INDEXER_QUEUE + .send(IndexerQueueTask::Diagnostics(RefreshDiagnosticsTask { + uri: uri.clone(), + state: state.clone(), + })) + .unwrap_or_else(|err| lsp::log_error!("Failed to queue diagnostics refresh: {err}")); + } +} diff --git a/crates/ark/src/lsp/mod.rs b/crates/ark/src/lsp/mod.rs index cdf230b4e..4e6f1f9e9 100644 --- a/crates/ark/src/lsp/mod.rs +++ b/crates/ark/src/lsp/mod.rs @@ -63,7 +63,6 @@ pub(crate) use _log; pub(crate) use log_error; pub(crate) use log_info; pub(crate) use log_warn; +pub(crate) use main_loop::diagnostics_refresh_all; pub(crate) use main_loop::publish_diagnostics; pub(crate) use main_loop::spawn_blocking; -pub(crate) use main_loop::spawn_diagnostics_refresh; -pub(crate) use main_loop::spawn_diagnostics_refresh_all; diff --git a/crates/ark/src/lsp/state.rs b/crates/ark/src/lsp/state.rs index 12f53c947..0d1d1d03c 100644 --- a/crates/ark/src/lsp/state.rs +++ b/crates/ark/src/lsp/state.rs @@ -51,7 +51,7 @@ pub(crate) struct WorldState { pub(crate) installed_packages: Vec, /// The root of the source tree (e.g., a package). - pub(crate) _root: Option, + pub(crate) root: Option, /// Map of package name to package metadata for installed libraries. Lazily populated. pub(crate) library: Library, diff --git a/crates/ark/src/lsp/state_handlers.rs b/crates/ark/src/lsp/state_handlers.rs index 2fb24ac55..7d050de9c 100644 --- a/crates/ark/src/lsp/state_handlers.rs +++ b/crates/ark/src/lsp/state_handlers.rs @@ -5,8 +5,6 @@ // // -use std::path::Path; - use anyhow::anyhow; use tower_lsp::lsp_types; use tower_lsp::lsp_types::CompletionOptions; @@ -44,7 +42,8 @@ use crate::lsp::config::DOCUMENT_SETTINGS; use crate::lsp::config::GLOBAL_SETTINGS; use crate::lsp::documents::Document; use crate::lsp::encoding::get_position_encoding_kind; -use crate::lsp::indexer; +use crate::lsp::inputs::package::Package; +use crate::lsp::inputs::source_root::SourceRoot; use crate::lsp::main_loop::DidCloseVirtualDocumentParams; use crate::lsp::main_loop::DidOpenVirtualDocumentParams; use crate::lsp::main_loop::LspState; @@ -84,18 +83,42 @@ pub(crate) fn initialize( for folder in workspace_folders.iter() { state.workspace.folders.push(folder.uri.clone()); if let Ok(path) = folder.uri.to_file_path() { - if let Some(path) = path.to_str() { - folders.push(path.to_string()); + // Try to load package from this workspace folder and set as + // root if found. This means we're dealing with a package + // source. + if state.root.is_none() { + match Package::load_from_folder(&path) { + Ok(Some(pkg)) => { + log::info!( + "Root: Loaded package `{pkg}` from {path} as project root", + pkg = pkg.description.name, + path = path.display() + ); + state.root = Some(SourceRoot::Package(pkg)); + }, + Ok(None) => { + log::info!( + "Root: No package found at {path}, treating as folder of scripts", + path = path.display() + ); + }, + Err(err) => { + log::warn!( + "Root: Error loading package at {path}: {err}", + path = path.display() + ); + }, + } + } + if let Some(path_str) = path.to_str() { + folders.push(path_str.to_string()); } } } } // Start first round of indexing - lsp::spawn_blocking(|| { - indexer::start(folders); - Ok(None) - }); + lsp::main_loop::index_start(folders, state.clone()); Ok(InitializeResult { server_info: Some(ServerInfo { @@ -177,8 +200,7 @@ pub(crate) fn did_open( // NOTE: Do we need to call `update_config()` here? // update_config(vec![uri]).await; - update_index(&uri, &document); - lsp::spawn_diagnostics_refresh(uri, document, state.clone()); + lsp::main_loop::index_update(uri.clone(), document.clone(), state.clone()); Ok(()) } @@ -190,17 +212,16 @@ pub(crate) fn did_change( state: &mut WorldState, ) -> anyhow::Result<()> { let uri = ¶ms.text_document.uri; - let doc = state.get_document_mut(uri)?; + let document = state.get_document_mut(uri)?; let mut parser = lsp_state .parsers .get_mut(uri) .ok_or(anyhow!("No parser for {uri}"))?; - doc.on_did_change(&mut parser, ¶ms); + document.on_did_change(&mut parser, ¶ms); - update_index(uri, doc); - lsp::spawn_diagnostics_refresh(uri.clone(), doc.clone(), state.clone()); + lsp::main_loop::index_update(uri.clone(), document.clone(), state.clone()); Ok(()) } @@ -355,7 +376,7 @@ async fn update_config( // Refresh diagnostics if the configuration changed if state.config.diagnostics != diagnostics_config { tracing::info!("Refreshing diagnostics after configuration changed"); - lsp::spawn_diagnostics_refresh_all(state.clone()); + lsp::main_loop::diagnostics_refresh_all(state.clone()); } Ok(()) @@ -373,7 +394,7 @@ pub(crate) fn did_change_console_inputs( // during package development in conjunction with `devtools::load_all()`. // Ideally diagnostics would not rely on these though, and we wouldn't need // to refresh from here. - lsp::spawn_diagnostics_refresh_all(state.clone()); + lsp::diagnostics_refresh_all(state.clone()); Ok(()) } @@ -396,16 +417,3 @@ pub(crate) fn did_close_virtual_document( state.virtual_documents.remove(¶ms.uri); Ok(()) } - -// FIXME: The initial indexer is currently racing against our state notification -// handlers. The indexer is synchronised through a mutex but we might end up in -// a weird state. Eventually the index should be moved to WorldState and created -// on demand with Salsa instrumenting and cancellation. -fn update_index(uri: &url::Url, doc: &Document) { - if let Ok(path) = uri.to_file_path() { - let path = Path::new(&path); - if let Err(err) = indexer::update(&doc, &path) { - lsp::log_error!("{err:?}"); - } - } -}