Skip to content

Allow binaryData scan for single orga #8791

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions app/controllers/WKRemoteDataStoreController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,22 @@ class WKRemoteDataStoreController @Inject()(
}
}

def updateAll(name: String, key: String): Action[List[InboxDataSource]] =
def updateAll(name: String, key: String, organizationId: Option[String]): Action[List[InboxDataSource]] =
Action.async(validateJson[List[InboxDataSource]]) { implicit request =>
dataStoreService.validateAccess(name, key) { dataStore =>
val dataSources = request.body
for {
before <- Instant.nowFox
selectedOrgaLabel = organizationId.map(id => s"for organization $id").getOrElse("for all organizations")
_ = logger.info(
s"Received dataset list from datastore '${dataStore.name}': " +
s"Received dataset list from datastore ${dataStore.name} $selectedOrgaLabel: " +
s"${dataSources.count(_.isUsable)} active, ${dataSources.count(!_.isUsable)} inactive")
existingIds <- datasetService.updateDataSources(dataStore, dataSources)(GlobalAccessContext)
_ <- datasetService.deactivateUnreportedDataSources(existingIds, dataStore)
_ <- datasetService.deactivateUnreportedDataSources(existingIds, dataStore, organizationId)
_ = if (Instant.since(before) > (30 seconds))
Instant.logSince(before, s"Updating datasources from datastore '${dataStore.name}'", logger)
Instant.logSince(before,
s"Updating datasources from datastore ${dataStore.name} $selectedOrgaLabel",
logger)
} yield Ok
}
}
Expand Down
6 changes: 4 additions & 2 deletions app/models/dataset/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -652,11 +652,13 @@ class DatasetDAO @Inject()(sqlClient: SqlClient, datasetLayerDAO: DatasetLayerDA

def deactivateUnreported(existingDatasetIds: List[ObjectId],
dataStoreName: String,
organizationId: Option[String],
unreportedStatus: String,
inactiveStatusList: List[String]): Fox[Unit] = {
val inSelectedOrga = organizationId.map(id => q"_organization = $id").getOrElse(q"TRUE")
val inclusionPredicate =
if (existingDatasetIds.isEmpty) q"TRUE"
else q"_id NOT IN ${SqlToken.tupleFromList(existingDatasetIds)}"
if (existingDatasetIds.isEmpty) inSelectedOrga
else q"_id NOT IN ${SqlToken.tupleFromList(existingDatasetIds)} AND $inSelectedOrga"
val statusNotAlreadyInactive = q"status NOT IN ${SqlToken.tupleFromList(inactiveStatusList)}"
val deleteMagsQuery =
q"""DELETE FROM webknossos.dataset_mags
Expand Down
10 changes: 8 additions & 2 deletions app/models/dataset/DatasetService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,14 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO,
}
} else Fox.successful(None)

def deactivateUnreportedDataSources(existingDatasetIds: List[ObjectId], dataStore: DataStore): Fox[Unit] =
datasetDAO.deactivateUnreported(existingDatasetIds, dataStore.name, unreportedStatus, inactiveStatusList)
def deactivateUnreportedDataSources(existingDatasetIds: List[ObjectId],
dataStore: DataStore,
organizationId: Option[String]): Fox[Unit] =
datasetDAO.deactivateUnreported(existingDatasetIds,
dataStore.name,
organizationId,
unreportedStatus,
inactiveStatusList)

def getSharingToken(datasetId: ObjectId)(implicit ctx: DBAccessContext): Fox[String] = {

Expand Down
2 changes: 1 addition & 1 deletion conf/webknossos.latest.routes
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ DELETE /folders/:id
# Datastores
GET /datastores controllers.DataStoreController.list()
PUT /datastores/:name/datasource controllers.WKRemoteDataStoreController.updateOne(name: String, key: String)
PUT /datastores/:name/datasources controllers.WKRemoteDataStoreController.updateAll(name: String, key: String)
PUT /datastores/:name/datasources controllers.WKRemoteDataStoreController.updateAll(name: String, key: String, organizationId: Option[String])
PUT /datastores/:name/datasources/paths controllers.WKRemoteDataStoreController.updatePaths(name: String, key: String)
GET /datastores/:name/datasources/:organizationId/:directoryName/paths controllers.WKRemoteDataStoreController.getPaths(name: String, key: String, organizationId: String, directoryName: String)
GET /datastores/:name/datasources/:datasetId controllers.WKRemoteDataStoreController.getDataSource(name: String, key: String, datasetId: ObjectId)
Expand Down
18 changes: 13 additions & 5 deletions frontend/javascripts/admin/rest_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1318,13 +1318,21 @@ export function updateDatasetTeams(
});
}

export async function triggerDatasetCheck(datastoreHost: string): Promise<void> {
await doWithToken((token) =>
Request.triggerRequest(`/data/triggers/checkInboxBlocking?token=${token}`, {
export async function triggerDatasetCheck(
datastoreHost: string,
organizationId?: string,
): Promise<void> {
await doWithToken((token) => {
const params = new URLSearchParams();
params.set("token", token);
if (organizationId) {
params.set("organizationId", organizationId);
}
return Request.triggerRequest(`/data/triggers/checkInboxBlocking?${params}`, {
host: datastoreHost,
method: "POST",
}),
);
});
});
}

export async function triggerDatasetClearCache(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export type DatasetCollectionContextValue = {
datasets: Array<APIDatasetCompact>;
isLoading: boolean;
isChecking: boolean;
checkDatasets: () => Promise<void>;
checkDatasets: (organizationId: string | undefined) => Promise<void>;
fetchDatasets: () => void;
reloadDataset: (datasetId: string, datasetsToUpdate?: Array<APIDatasetCompact>) => Promise<void>;
updateCachedDataset: (datasetId: string, updater: DatasetUpdater) => Promise<APIDataset>;
Expand Down Expand Up @@ -216,7 +216,7 @@ export default function DatasetCollectionContextProvider({
isChecking,
getBreadcrumbs,
getActiveSubfolders,
checkDatasets: async () => {
checkDatasets: async (organizationId: string | undefined) => {
if (isChecking) {
console.warn("Ignore second rechecking request, since a recheck is already in progress");
return;
Expand All @@ -230,7 +230,7 @@ export default function DatasetCollectionContextProvider({
) =>
// block the subsequent fetch of datasets. Otherwise, one offline
// datastore will stop the refresh for all datastores.
triggerDatasetCheck(datastore.url).catch(() => {}),
triggerDatasetCheck(datastore.url, organizationId).catch(() => {}),
),
);
setIsChecking(false);
Expand Down
4 changes: 3 additions & 1 deletion frontend/javascripts/dashboard/dataset_view.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import DatasetTable from "dashboard/advanced_dataset/dataset_table";
import dayjs from "dayjs";
import features from "features";
import Persistence from "libs/persistence";
import { useWkSelector } from "libs/react_hooks";
import * as Utils from "libs/utils";
import type { MenuProps } from "rc-menu";
import type React from "react";
Expand Down Expand Up @@ -312,13 +313,14 @@ function DatasetView({

export function DatasetRefreshButton({ context }: { context: DatasetCollectionContextValue }) {
const showLoadingIndicator = context.isLoading || context.isChecking;
const organizationId = useWkSelector((state) => state.activeOrganization?.id);

return (
<FastTooltip
title={showLoadingIndicator ? "Refreshing the dataset list." : "Refresh the dataset list."}
>
<Dropdown.Button
menu={{ onClick: context.checkDatasets, items: refreshMenuItems }}
menu={{ onClick: () => context.checkDatasets(organizationId), items: refreshMenuItems }}
style={{ marginRight: 5 }}
onClick={() => context.fetchDatasets()}
disabled={context.isChecking}
Expand Down
2 changes: 2 additions & 0 deletions unreleased_changes/8791.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
### Changed
- Explicitly scanning datasets on disk is now faster for multi-organization setups.
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,13 @@ class DataSourceController @Inject()(
}
}

def triggerInboxCheckBlocking(): Action[AnyContent] = Action.async { implicit request =>
accessTokenService.validateAccessFromTokenContext(UserAccessRequest.administrateDataSources) {
def triggerInboxCheckBlocking(organizationId: Option[String]): Action[AnyContent] = Action.async { implicit request =>
accessTokenService.validateAccessFromTokenContext(
organizationId
.map(id => UserAccessRequest.administrateDataSources(id))
.getOrElse(UserAccessRequest.administrateDataSources)) {
for {
_ <- dataSourceService.checkInbox(verbose = true)
_ <- dataSourceService.checkInbox(verbose = true, organizationId = organizationId)
} yield Ok
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ class DSRemoteWebknossosClient @Inject()(
uploadedDatasetId <- JsonHelper.as[String](uploadedDatasetIdJson \ "id").toFox ?~> "uploadedDatasetId.invalid"
} yield uploadedDatasetId

def reportDataSources(dataSources: List[InboxDataSourceLike]): Fox[_] =
def reportDataSources(dataSources: List[InboxDataSourceLike], organizationId: Option[String]): Fox[_] =
rpc(s"$webknossosUri/api/datastores/$dataStoreName/datasources")
.addQueryString("key" -> dataStoreKey)
.addQueryStringOptional("organizationId", organizationId)
.silent
.putJson(dataSources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ class DataSourceRepository @Inject()(
_ <- remoteWebknossosClient.reportDataSource(dataSource)
} yield ()

def updateDataSources(dataSources: List[InboxDataSource]): Fox[Unit] =
def updateDataSources(dataSources: List[InboxDataSource], organizationId: Option[String]): Fox[Unit] =
for {
_ <- Fox.successful(())
_ = removeAll()
_ = dataSources.foreach(dataSource => insert(dataSource.id, dataSource))
_ <- remoteWebknossosClient.reportDataSources(dataSources)
_ <- remoteWebknossosClient.reportDataSources(dataSources, organizationId)
} yield ()

def removeDataSource(dataSourceId: DataSourceId): Fox[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class DataSourceService @Inject()(

def tick(): Fox[Unit] =
for {
_ <- checkInbox(verbose = inboxCheckVerboseCounter == 0)
_ <- checkInbox(verbose = inboxCheckVerboseCounter == 0, organizationId = None)
_ = inboxCheckVerboseCounter += 1
_ = if (inboxCheckVerboseCounter >= 10) inboxCheckVerboseCounter = 0
} yield ()
Expand All @@ -72,21 +72,24 @@ class DataSourceService @Inject()(

}

def checkInbox(verbose: Boolean): Fox[Unit] = {
if (verbose) logger.info(s"Scanning inbox ($dataBaseDir)...")
def checkInbox(verbose: Boolean, organizationId: Option[String]): Fox[Unit] = {
val selectedOrgaLabel = organizationId.map(id => s"/$id").getOrElse("")
def orgaFilterFn(organizationId: String): Path => Boolean =
(path: Path) => path.getFileName.toString == organizationId
val selectedOrgaFilter: Path => Boolean = organizationId.map(id => orgaFilterFn(id)).getOrElse((_: Path) => true)
if (verbose) logger.info(s"Scanning inbox ($dataBaseDir$selectedOrgaLabel)...")
for {
_ <- PathUtils.listDirectories(dataBaseDir, silent = false) match {
_ <- PathUtils.listDirectories(dataBaseDir, silent = false, filters = selectedOrgaFilter) match {
case Full(organizationDirs) =>
if (verbose && organizationId.isEmpty) logEmptyDirs(organizationDirs)
val foundInboxSources = organizationDirs.flatMap(scanOrganizationDirForDataSources)
logFoundDatasources(foundInboxSources, verbose, selectedOrgaLabel)
for {
_ <- Fox.successful(())
_ = if (verbose) logEmptyDirs(organizationDirs)
foundInboxSources = organizationDirs.flatMap(teamAwareInboxSources)
_ = logFoundDatasources(foundInboxSources, verbose)
_ <- dataSourceRepository.updateDataSources(foundInboxSources)
_ <- dataSourceRepository.updateDataSources(foundInboxSources, organizationId)
_ <- reportRealPaths(foundInboxSources)
} yield ()
case e =>
val errorMsg = s"Failed to scan inbox. Error during list directories on '$dataBaseDir': $e"
val errorMsg = s"Failed to scan inbox. Error during list directories on '$dataBaseDir$selectedOrgaLabel': $e"
logger.error(errorMsg)
Fox.failure(errorMsg)
}
Expand Down Expand Up @@ -169,9 +172,11 @@ class DataSourceService @Inject()(
basePath.resolve(relativePath).normalize().toAbsolutePath
}

private def logFoundDatasources(foundInboxSources: Seq[InboxDataSource], verbose: Boolean): Unit = {
private def logFoundDatasources(foundInboxSources: Seq[InboxDataSource],
verbose: Boolean,
selectedOrgaLabel: String): Unit = {
val shortForm =
s"Finished scanning inbox ($dataBaseDir): ${foundInboxSources.count(_.isUsable)} active, ${foundInboxSources
s"Finished scanning inbox ($dataBaseDir$selectedOrgaLabel): ${foundInboxSources.count(_.isUsable)} active, ${foundInboxSources
.count(!_.isUsable)} inactive"
val msg = if (verbose) {
val byTeam: Map[String, Seq[InboxDataSource]] = foundInboxSources.groupBy(_.id.organizationId)
Expand Down Expand Up @@ -307,7 +312,7 @@ class DataSourceService @Inject()(
}
}

private def teamAwareInboxSources(path: Path): List[InboxDataSource] = {
private def scanOrganizationDirForDataSources(path: Path): List[InboxDataSource] = {
val organization = path.getFileName.toString

PathUtils.listDirectories(path, silent = true) match {
Expand Down
2 changes: 1 addition & 1 deletion webknossos-datastore/conf/datastore.latest.routes
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ POST /datasets/exploreRemote
DELETE /wkDatasets/:datasetId @com.scalableminds.webknossos.datastore.controllers.DataSourceController.invalidateCache(datasetId: String)

# Actions
POST /triggers/checkInboxBlocking @com.scalableminds.webknossos.datastore.controllers.DataSourceController.triggerInboxCheckBlocking()
POST /triggers/checkInboxBlocking @com.scalableminds.webknossos.datastore.controllers.DataSourceController.triggerInboxCheckBlocking(organizationId: Option[String])
POST /triggers/createOrganizationDirectory @com.scalableminds.webknossos.datastore.controllers.DataSourceController.createOrganizationDirectory(organizationId: String)
POST /triggers/reload/:organizationId/:datasetDirectoryName @com.scalableminds.webknossos.datastore.controllers.DataSourceController.reload(organizationId: String, datasetDirectoryName: String, layerName: Option[String])

Expand Down