diff --git a/unreleased_changes/8717.md b/unreleased_changes/8717.md new file mode 100644 index 00000000000..add2acb305b --- /dev/null +++ b/unreleased_changes/8717.md @@ -0,0 +1,2 @@ +### Added +- Connectomes can now also be read from the new zarr3-based format, and from remote object storage. diff --git a/util/src/main/scala/com/scalableminds/util/collections/SequenceUtils.scala b/util/src/main/scala/com/scalableminds/util/collections/SequenceUtils.scala index 1ce0f7ed496..3856deeac5c 100644 --- a/util/src/main/scala/com/scalableminds/util/collections/SequenceUtils.scala +++ b/util/src/main/scala/com/scalableminds/util/collections/SequenceUtils.scala @@ -1,5 +1,7 @@ package com.scalableminds.util.collections +import scala.collection.Searching.{Found, InsertionPoint} + object SequenceUtils { def findUniqueElement[T](list: Seq[T]): Option[T] = { val uniqueElements = list.distinct @@ -51,4 +53,11 @@ object SequenceUtils { val batchTo = Math.min(to, (batchIndex + 1) * batchSize + from - 1) (batchFrom, batchTo) } + + // Search in a sorted array, returns Box of index where element is found or, if missing, where element would be inserted + def searchSorted(haystack: Array[Long], needle: Long): Int = + haystack.search(needle) match { + case Found(i) => i + case InsertionPoint(i) => i + } } diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/DataStoreModule.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/DataStoreModule.scala index daa4cc9c927..4ff41ea6eaa 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/DataStoreModule.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/DataStoreModule.scala @@ -4,6 +4,11 @@ import org.apache.pekko.actor.ActorSystem import com.google.inject.AbstractModule import com.google.inject.name.Names import com.scalableminds.webknossos.datastore.services._ +import com.scalableminds.webknossos.datastore.services.connectome.{ + ConnectomeFileService, + Hdf5ConnectomeFileService, + ZarrConnectomeFileService +} import com.scalableminds.webknossos.datastore.services.mapping.{ AgglomerateService, Hdf5AgglomerateService, @@ -52,6 +57,9 @@ class DataStoreModule extends AbstractModule { bind(classOf[SegmentIndexFileService]).asEagerSingleton() bind(classOf[ZarrSegmentIndexFileService]).asEagerSingleton() bind(classOf[Hdf5SegmentIndexFileService]).asEagerSingleton() + bind(classOf[ConnectomeFileService]).asEagerSingleton() + bind(classOf[ZarrConnectomeFileService]).asEagerSingleton() + bind(classOf[Hdf5ConnectomeFileService]).asEagerSingleton() bind(classOf[NeuroglancerPrecomputedMeshFileService]).asEagerSingleton() bind(classOf[RemoteSourceDescriptorService]).asEagerSingleton() bind(classOf[ChunkCacheService]).asEagerSingleton() diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/controllers/DataSourceController.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/controllers/DataSourceController.scala index 5ea67e147d6..f55638b663e 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/controllers/DataSourceController.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/controllers/DataSourceController.scala @@ -25,6 +25,12 @@ import com.scalableminds.webknossos.datastore.services.uploading._ import com.scalableminds.webknossos.datastore.storage.DataVaultService import com.scalableminds.util.tools.Box.tryo import com.scalableminds.util.tools.{Box, Empty, Failure, Full} +import com.scalableminds.webknossos.datastore.services.connectome.{ + ByAgglomerateIdsRequest, + BySynapseIdsRequest, + ConnectomeFileService, + SynapticPartnerDirection +} import com.scalableminds.webknossos.datastore.services.mapping.AgglomerateService import play.api.data.Form import play.api.data.Forms.{longNumber, nonEmptyText, number, tuple} @@ -452,6 +458,8 @@ class DataSourceController @Inject()( meshFileService.clearCache(dataSourceId, layerName) val closedSegmentIndexFileHandleCount = segmentIndexFileService.clearCache(dataSourceId, layerName) + val closedConnectomeFileHandleCount = + connectomeFileService.clearCache(dataSourceId, layerName) val reloadedDataSource: InboxDataSource = dataSourceService.dataSourceFromDir( dataSourceService.dataBaseDir.resolve(organizationId).resolve(datasetDirectoryName), organizationId) @@ -459,7 +467,7 @@ class DataSourceController @Inject()( val clearedVaultCacheEntriesOpt = dataSourceService.invalidateVaultCache(reloadedDataSource, layerName) clearedVaultCacheEntriesOpt.foreach { clearedVaultCacheEntries => logger.info( - s"Cleared caches for ${layerName.map(l => s"layer '$l' of ").getOrElse("")}dataset $organizationId/$datasetDirectoryName: closed $closedAgglomerateFileHandleCount agglomerate file handles, $closedMeshFileHandleCount mesh file handles, $closedSegmentIndexFileHandleCount segment index file handles, removed $clearedBucketProviderCount bucketProviders, $clearedVaultCacheEntries vault cache entries and $removedChunksCount image chunk cache entries.") + s"Cleared caches for ${layerName.map(l => s"layer '$l' of ").getOrElse("")}dataset $organizationId/$datasetDirectoryName: closed $closedAgglomerateFileHandleCount agglomerate file handles, $closedMeshFileHandleCount mesh file handles, $closedSegmentIndexFileHandleCount segment index file handles, $closedConnectomeFileHandleCount connectome file handles, removed $clearedBucketProviderCount bucketProviders, $clearedVaultCacheEntries vault cache entries and $removedChunksCount image chunk cache entries.") } reloadedDataSource } @@ -510,21 +518,12 @@ class DataSourceController @Inject()( Action.async { implicit request => accessTokenService.validateAccessFromTokenContext( UserAccessRequest.readDataSources(DataSourceId(datasetDirectoryName, organizationId))) { - val connectomeFileNames = - connectomeFileService.exploreConnectomeFiles(organizationId, datasetDirectoryName, dataLayerName) for { - mappingNames <- Fox.serialCombined(connectomeFileNames.toList) { connectomeFileName => - val path = - connectomeFileService.connectomeFilePath(organizationId, - datasetDirectoryName, - dataLayerName, - connectomeFileName) - connectomeFileService.mappingNameForConnectomeFile(path) - } - connectomesWithMappings = connectomeFileNames - .zip(mappingNames) - .map(tuple => ConnectomeFileNameWithMappingName(tuple._1, tuple._2)) - } yield Ok(Json.toJson(connectomesWithMappings)) + (dataSource, dataLayer) <- dataSourceRepository.getDataSourceAndDataLayer(organizationId, + datasetDirectoryName, + dataLayerName) + connectomeFileInfos <- connectomeFileService.listConnectomeFiles(dataSource.id, dataLayer) + } yield Ok(Json.toJson(connectomeFileInfos)) } } @@ -535,10 +534,13 @@ class DataSourceController @Inject()( accessTokenService.validateAccessFromTokenContext( UserAccessRequest.readDataSources(DataSourceId(datasetDirectoryName, organizationId))) { for { - meshFilePath <- Fox.successful( - connectomeFileService - .connectomeFilePath(organizationId, datasetDirectoryName, dataLayerName, request.body.connectomeFile)) - synapses <- connectomeFileService.synapsesForAgglomerates(meshFilePath, request.body.agglomerateIds) + (dataSource, dataLayer) <- dataSourceRepository.getDataSourceAndDataLayer(organizationId, + datasetDirectoryName, + dataLayerName) + meshFileKey <- connectomeFileService.lookUpConnectomeFileKey(dataSource.id, + dataLayer, + request.body.connectomeFile) + synapses <- connectomeFileService.synapsesForAgglomerates(meshFileKey, request.body.agglomerateIds) } yield Ok(Json.toJson(synapses)) } } @@ -551,12 +553,18 @@ class DataSourceController @Inject()( accessTokenService.validateAccessFromTokenContext( UserAccessRequest.readDataSources(DataSourceId(datasetDirectoryName, organizationId))) { for { - meshFilePath <- Fox.successful( - connectomeFileService - .connectomeFilePath(organizationId, datasetDirectoryName, dataLayerName, request.body.connectomeFile)) - agglomerateIds <- connectomeFileService.synapticPartnerForSynapses(meshFilePath, + directionValidated <- SynapticPartnerDirection + .fromString(direction) + .toFox ?~> "could not parse synaptic partner direction" + (dataSource, dataLayer) <- dataSourceRepository.getDataSourceAndDataLayer(organizationId, + datasetDirectoryName, + dataLayerName) + meshFileKey <- connectomeFileService.lookUpConnectomeFileKey(dataSource.id, + dataLayer, + request.body.connectomeFile) + agglomerateIds <- connectomeFileService.synapticPartnerForSynapses(meshFileKey, request.body.synapseIds, - direction) + directionValidated) } yield Ok(Json.toJson(agglomerateIds)) } } @@ -568,10 +576,13 @@ class DataSourceController @Inject()( accessTokenService.validateAccessFromTokenContext( UserAccessRequest.readDataSources(DataSourceId(datasetDirectoryName, organizationId))) { for { - meshFilePath <- Fox.successful( - connectomeFileService - .connectomeFilePath(organizationId, datasetDirectoryName, dataLayerName, request.body.connectomeFile)) - synapsePositions <- connectomeFileService.positionsForSynapses(meshFilePath, request.body.synapseIds) + (dataSource, dataLayer) <- dataSourceRepository.getDataSourceAndDataLayer(organizationId, + datasetDirectoryName, + dataLayerName) + meshFileKey <- connectomeFileService.lookUpConnectomeFileKey(dataSource.id, + dataLayer, + request.body.connectomeFile) + synapsePositions <- connectomeFileService.positionsForSynapses(meshFileKey, request.body.synapseIds) } yield Ok(Json.toJson(synapsePositions)) } } @@ -583,10 +594,13 @@ class DataSourceController @Inject()( accessTokenService.validateAccessFromTokenContext( UserAccessRequest.readDataSources(DataSourceId(datasetDirectoryName, organizationId))) { for { - meshFilePath <- Fox.successful( - connectomeFileService - .connectomeFilePath(organizationId, datasetDirectoryName, dataLayerName, request.body.connectomeFile)) - synapseTypes <- connectomeFileService.typesForSynapses(meshFilePath, request.body.synapseIds) + (dataSource, dataLayer) <- dataSourceRepository.getDataSourceAndDataLayer(organizationId, + datasetDirectoryName, + dataLayerName) + meshFileKey <- connectomeFileService.lookUpConnectomeFileKey(dataSource.id, + dataLayer, + request.body.connectomeFile) + synapseTypes <- connectomeFileService.typesForSynapses(meshFileKey, request.body.synapseIds) } yield Ok(Json.toJson(synapseTypes)) } } diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datareaders/DatasetArray.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datareaders/DatasetArray.scala index 53be6aa9b5e..aa70429c853 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datareaders/DatasetArray.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datareaders/DatasetArray.scala @@ -193,6 +193,8 @@ class DatasetArray(vaultPath: VaultPath, tc: TokenContext): Fox[MultiArray] = if (shape.contains(0)) { Fox.successful(MultiArrayUtils.createEmpty(header.resolvedDataType, rank)) + } else if (shape.exists(_ < 0)) { + Fox.failure(s"Trying to read negative shape from DatasetArray: ${shape.mkString(",")}") } else { val totalOffset: Array[Long] = offset.zip(header.voxelOffset).map { case (o, v) => o - v }.padTo(offset.length, 0) val chunkIndices = ChunkUtils.computeChunkIndices(datasetShape, chunkShape, shape, totalOffset) diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/ConnectomeFileService.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/ConnectomeFileService.scala deleted file mode 100644 index 3a24c199568..00000000000 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/ConnectomeFileService.scala +++ /dev/null @@ -1,309 +0,0 @@ -package com.scalableminds.webknossos.datastore.services - -import java.io.File -import java.nio.file.{Path, Paths} -import com.scalableminds.util.io.PathUtils -import com.scalableminds.util.tools.{Fox, JsonHelper, FoxImplicits} -import com.scalableminds.webknossos.datastore.DataStoreConfig -import com.scalableminds.webknossos.datastore.storage.{CachedHdf5File, Hdf5FileCache} -import com.typesafe.scalalogging.LazyLogging - -import javax.inject.Inject -import com.scalableminds.util.tools.Full -import com.scalableminds.util.tools.Box.tryo -import org.apache.commons.io.FilenameUtils -import play.api.libs.json.{Json, OFormat} - -import scala.collection.Searching._ -import scala.collection.mutable.ListBuffer -import scala.concurrent.ExecutionContext - -case class ByAgglomerateIdsRequest( - connectomeFile: String, - agglomerateIds: List[Long] -) - -object ByAgglomerateIdsRequest { - implicit val jsonFormat: OFormat[ByAgglomerateIdsRequest] = Json.format[ByAgglomerateIdsRequest] -} - -case class BySynapseIdsRequest( - connectomeFile: String, - synapseIds: List[Long] -) - -object BySynapseIdsRequest { - implicit val jsonFormat: OFormat[BySynapseIdsRequest] = Json.format[BySynapseIdsRequest] -} - -case class DirectedSynapseList( - in: List[Long], - out: List[Long] -) - -object DirectedSynapseList { - implicit val jsonFormat: OFormat[DirectedSynapseList] = Json.format[DirectedSynapseList] -} - -case class DirectedSynapseListMutable( - in: ListBuffer[Long], - out: ListBuffer[Long] -) { - def freeze: DirectedSynapseList = DirectedSynapseList(in.toList, out.toList) -} - -object DirectedSynapseListMutable { - def empty: DirectedSynapseListMutable = DirectedSynapseListMutable(ListBuffer(), ListBuffer()) -} - -case class SynapseTypesWithLegend( - synapseTypes: List[Long], - typeToString: List[String], -) - -object SynapseTypesWithLegend { - implicit val jsonFormat: OFormat[SynapseTypesWithLegend] = Json.format[SynapseTypesWithLegend] -} - -case class ConnectomeFileNameWithMappingName( - connectomeFileName: String, - mappingName: String -) - -object ConnectomeFileNameWithMappingName { - implicit val jsonFormat: OFormat[ConnectomeFileNameWithMappingName] = Json.format[ConnectomeFileNameWithMappingName] -} - -case class ConnectomeLegend(synapse_type_names: List[String]) - -object ConnectomeLegend { - implicit val jsonFormat: OFormat[ConnectomeLegend] = Json.format[ConnectomeLegend] -} - -class ConnectomeFileService @Inject()(config: DataStoreConfig)(implicit ec: ExecutionContext) - extends FoxImplicits - with LazyLogging { - - private val dataBaseDir = Paths.get(config.Datastore.baseDirectory) - private val connectomesDir = "connectomes" - private val connectomeFileExtension = "hdf5" - - private lazy val connectomeFileCache = new Hdf5FileCache(30) - - def exploreConnectomeFiles(organizationId: String, - datasetDirectoryName: String, - dataLayerName: String): Set[String] = { - val layerDir = dataBaseDir.resolve(organizationId).resolve(datasetDirectoryName).resolve(dataLayerName) - PathUtils - .listFiles(layerDir.resolve(connectomesDir), - silent = true, - PathUtils.fileExtensionFilter(connectomeFileExtension)) - .map { paths => - paths.map(path => FilenameUtils.removeExtension(path.getFileName.toString)) - } - .toOption - .getOrElse(Nil) - .toSet - } - - def connectomeFilePath(organizationId: String, - datasetDirectoryName: String, - dataLayerName: String, - connectomeFileName: String): Path = - dataBaseDir - .resolve(organizationId) - .resolve(datasetDirectoryName) - .resolve(dataLayerName) - .resolve(connectomesDir) - .resolve(s"$connectomeFileName.$connectomeFileExtension") - - def mappingNameForConnectomeFile(connectomeFilePath: Path): Fox[String] = - for { - cachedConnectomeFile <- tryo { - connectomeFileCache.getCachedHdf5File(connectomeFilePath)(CachedHdf5File.fromPath) - }.toFox ?~> "connectome.file.open.failed" - mappingName <- finishAccessOnFailure(cachedConnectomeFile) { - cachedConnectomeFile.stringReader.getAttr("/", "metadata/mapping_name") - } ?~> "connectome.file.readEncoding.failed" - _ = cachedConnectomeFile.finishAccess() - } yield mappingName - - def synapsesForAgglomerates(connectomeFilePath: Path, agglomerateIds: List[Long]): Fox[List[DirectedSynapseList]] = - if (agglomerateIds.length == 1) { - for { - agglomerateId <- agglomerateIds.headOption.toFox ?~> "Failed to extract the single agglomerate ID from request" - inSynapses <- ingoingSynapsesForAgglomerate(connectomeFilePath, agglomerateId) ?~> "Failed to read ingoing synapses" - outSynapses <- outgoingSynapsesForAgglomerate(connectomeFilePath, agglomerateId) ?~> "Failed to read outgoing synapses" - } yield List(DirectedSynapseList(inSynapses, outSynapses)) - } else { - val agglomeratePairs = directedPairs(agglomerateIds.toSet.toList) - for { - synapsesPerPair <- Fox.serialCombined(agglomeratePairs)(pair => - synapseIdsForDirectedPair(connectomeFilePath, pair._1, pair._2)) - synapseListsMap = gatherPairSynapseLists(agglomerateIds, agglomeratePairs, synapsesPerPair) - synapseListsOrdered = agglomerateIds.map(id => synapseListsMap(id)) - } yield synapseListsOrdered - } - - private def directedPairs(items: List[Long]): List[(Long, Long)] = - (for { x <- items; y <- items } yield (x, y)).filter(pair => pair._1 != pair._2) - - private def gatherPairSynapseLists(agglomerateIds: List[Long], - agglomeratePairs: List[(Long, Long)], - synapsesPerPair: List[List[Long]]): collection.Map[Long, DirectedSynapseList] = { - val directedSynapseListsMutable = scala.collection.mutable.Map[Long, DirectedSynapseListMutable]() - agglomerateIds.foreach { agglomerateId => - directedSynapseListsMutable(agglomerateId) = DirectedSynapseListMutable.empty - } - agglomeratePairs.zip(synapsesPerPair).foreach { pairWithSynapses: ((Long, Long), List[Long]) => - val srcAgglomerate = pairWithSynapses._1._1 - val dstAgglomerate = pairWithSynapses._1._2 - directedSynapseListsMutable(srcAgglomerate).out ++= pairWithSynapses._2 - directedSynapseListsMutable(dstAgglomerate).in ++= pairWithSynapses._2 - } - directedSynapseListsMutable.view.mapValues(_.freeze).toMap - } - - private def ingoingSynapsesForAgglomerate(connectomeFilePath: Path, agglomerateId: Long): Fox[List[Long]] = - for { - cachedConnectomeFile <- tryo { - connectomeFileCache.getCachedHdf5File(connectomeFilePath)(CachedHdf5File.fromPath) - }.toFox ?~> "connectome.file.open.failed" - fromAndToPtr: Array[Long] <- finishAccessOnFailure(cachedConnectomeFile) { - cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset("/CSC_indptr", 2, agglomerateId) - } ?~> "Could not read offsets from connectome file" - from <- fromAndToPtr.lift(0).toFox ?~> "Could not read start offset from connectome file" - to <- fromAndToPtr.lift(1).toFox ?~> "Could not read end offset from connectome file" - // readArrayBlockWithOffset has a bug and does not return the empty array when block size 0 is passed, hence the if. - agglomeratePairs: Array[Long] <- if (to - from == 0L) Fox.successful(Array.empty[Long]) - else - finishAccessOnFailure(cachedConnectomeFile) { - cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset("/CSC_agglomerate_pair", (to - from).toInt, from) - } ?~> "Could not read agglomerate pairs from connectome file" - synapseIdsNested <- Fox.serialCombined(agglomeratePairs.toList) { agglomeratePair: Long => - for { - from <- finishAccessOnFailure(cachedConnectomeFile) { - cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset("/agglomerate_pair_offsets", 1, agglomeratePair) - }.flatMap(_.headOption.toFox) - to <- finishAccessOnFailure(cachedConnectomeFile) { - cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset("/agglomerate_pair_offsets", - 1, - agglomeratePair + 1) - }.flatMap(_.headOption.toFox) - } yield List.range(from, to) - } ?~> "Could not read ingoing synapses from connectome file" - _ = cachedConnectomeFile.finishAccess() - } yield synapseIdsNested.flatten - - private def outgoingSynapsesForAgglomerate(connectomeFilePath: Path, agglomerateId: Long): Fox[List[Long]] = - for { - cachedConnectomeFile <- tryo { - connectomeFileCache.getCachedHdf5File(connectomeFilePath)(CachedHdf5File.fromPath) - }.toFox ?~> "connectome.file.open.failed" - fromAndToPtr: Array[Long] <- finishAccessOnFailure(cachedConnectomeFile) { - cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset("/CSR_indptr", 2, agglomerateId) - } ?~> "Could not read offsets from connectome file" - fromPtr <- fromAndToPtr.lift(0).toFox ?~> "Could not read start offset from connectome file" - toPtr <- fromAndToPtr.lift(1).toFox ?~> "Could not read end offset from connectome file" - from <- finishAccessOnFailure(cachedConnectomeFile) { - cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset("/agglomerate_pair_offsets", 1, fromPtr) - }.flatMap(_.headOption.toFox) ?~> "Could not synapses from connectome file" - to <- finishAccessOnFailure(cachedConnectomeFile) { - cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset("/agglomerate_pair_offsets", 1, toPtr) - }.flatMap(_.headOption.toFox) ?~> "Could not synapses from connectome file" - } yield List.range(from, to) - - def synapticPartnerForSynapses(connectomeFilePath: Path, synapseIds: List[Long], direction: String): Fox[List[Long]] = - for { - _ <- Fox.fromBool(direction == "src" || direction == "dst") ?~> s"Invalid synaptic partner direction: $direction" - collection = s"/synapse_to_${direction}_agglomerate" - cachedConnectomeFile <- tryo { - connectomeFileCache.getCachedHdf5File(connectomeFilePath)(CachedHdf5File.fromPath) - }.toFox ?~> "connectome.file.open.failed" - agglomerateIds <- Fox.serialCombined(synapseIds) { synapseId: Long => - finishAccessOnFailure(cachedConnectomeFile) { - cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset(collection, 1, synapseId) - }.flatMap(_.headOption.toFox) - } - } yield agglomerateIds - - def positionsForSynapses(connectomeFilePath: Path, synapseIds: List[Long]): Fox[List[List[Long]]] = - for { - cachedConnectomeFile <- tryo { - connectomeFileCache.getCachedHdf5File(connectomeFilePath)(CachedHdf5File.fromPath) - }.toFox ?~> "connectome.file.open.failed" - synapsePositions <- Fox.serialCombined(synapseIds) { synapseId: Long => - finishAccessOnFailure(cachedConnectomeFile) { - cachedConnectomeFile.uint64Reader.readMatrixBlockWithOffset("/synapse_positions", 1, 3, synapseId, 0) - }.flatMap(_.headOption.toFox) - } - } yield synapsePositions.map(_.toList) - - def typesForSynapses(connectomeFilePath: Path, synapseIds: List[Long]): Fox[SynapseTypesWithLegend] = - for { - cachedConnectomeFile <- tryo { - connectomeFileCache.getCachedHdf5File(connectomeFilePath)(CachedHdf5File.fromPath) - }.toFox ?~> "connectome.file.open.failed" - typeNames = typeNamesForSynapsesOrEmpty(connectomeFilePath) - synapseTypes <- Fox.serialCombined(synapseIds) { synapseId: Long => - finishAccessOnFailure(cachedConnectomeFile) { - cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset("/synapse_types", 1, synapseId) - }.flatMap(_.headOption.toFox) - } - } yield SynapseTypesWithLegend(synapseTypes, typeNames) - - private def typeNamesForSynapsesOrEmpty(connectomeFilePath: Path): List[String] = { - val typeNamesPath = Paths.get(s"${connectomeFilePath.toString.dropRight(connectomeFileExtension.length)}json") - if (new File(typeNamesPath.toString).exists()) { - JsonHelper.parseFromFileAs[ConnectomeLegend](typeNamesPath, typeNamesPath.getParent) match { - case Full(connectomeLegend) => connectomeLegend.synapse_type_names - case _ => List.empty - } - } else List.empty - } - - private def synapseIdsForDirectedPair(connectomeFilePath: Path, - srcAgglomerateId: Long, - dstAgglomerateId: Long): Fox[List[Long]] = - for { - cachedConnectomeFile <- tryo { - connectomeFileCache.getCachedHdf5File(connectomeFilePath)(CachedHdf5File.fromPath) - }.toFox ?~> "connectome.file.open.failed" - fromAndToPtr: Array[Long] <- finishAccessOnFailure(cachedConnectomeFile) { - cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset("/CSR_indptr", 2, srcAgglomerateId) - } ?~> "Could not read offsets from connectome file" - fromPtr <- fromAndToPtr.lift(0).toFox ?~> "Could not read start offset from connectome file" - toPtr <- fromAndToPtr.lift(1).toFox ?~> "Could not read end offset from connectome file" - columnValues: Array[Long] <- if (toPtr - fromPtr == 0L) Fox.successful(Array.empty[Long]) - else - finishAccessOnFailure(cachedConnectomeFile) { - cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset("/CSR_indices", (toPtr - fromPtr).toInt, fromPtr) - } ?~> "Could not read agglomerate pairs from connectome file" - columnOffset <- searchSorted(columnValues, dstAgglomerateId) - pairIndex = fromPtr + columnOffset - synapses <- if ((columnOffset >= columnValues.length) || (columnValues(columnOffset) != dstAgglomerateId)) - Fox.successful(List.empty) - else - for { - fromAndTo <- finishAccessOnFailure(cachedConnectomeFile) { - cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset("/agglomerate_pair_offsets", 2, pairIndex) - } - from <- fromAndTo.lift(0).toFox - to <- fromAndTo.lift(1).toFox - } yield List.range(from, to) - } yield synapses - - private def searchSorted(haystack: Array[Long], needle: Long): Fox[Int] = - haystack.search(needle) match { - case Found(i) => Fox.successful(i) - case InsertionPoint(i) => Fox.successful(i) - } - - private def finishAccessOnFailure[T](f: CachedHdf5File)(block: => T): Fox[T] = - tryo { _: Throwable => - f.finishAccess() - } { - block - }.toFox - -} diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/connectome/ConnectomeFileService.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/connectome/ConnectomeFileService.scala new file mode 100644 index 00000000000..3d74607967e --- /dev/null +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/connectome/ConnectomeFileService.scala @@ -0,0 +1,303 @@ +package com.scalableminds.webknossos.datastore.services.connectome + +import com.scalableminds.util.accesscontext.TokenContext +import com.scalableminds.util.cache.AlfuCache +import com.scalableminds.util.io.PathUtils +import com.scalableminds.util.tools.Box.tryo +import com.scalableminds.util.tools.{Box, Fox, FoxImplicits} +import com.scalableminds.webknossos.datastore.DataStoreConfig +import com.scalableminds.webknossos.datastore.models.datasource.{ + DataLayer, + DataSourceId, + LayerAttachment, + LayerAttachmentDataformat +} +import com.scalableminds.webknossos.datastore.services.connectome.SynapticPartnerDirection.SynapticPartnerDirection +import com.scalableminds.webknossos.datastore.storage.RemoteSourceDescriptorService +import com.typesafe.scalalogging.LazyLogging +import org.apache.commons.io.FilenameUtils +import play.api.i18n.{Messages, MessagesProvider} +import play.api.libs.json.{Json, OFormat} + +import java.nio.file.Paths +import javax.inject.Inject +import scala.collection.mutable.ListBuffer +import scala.concurrent.ExecutionContext + +case class ByAgglomerateIdsRequest( + connectomeFile: String, + agglomerateIds: Seq[Long] +) + +object ByAgglomerateIdsRequest { + implicit val jsonFormat: OFormat[ByAgglomerateIdsRequest] = Json.format[ByAgglomerateIdsRequest] +} + +case class BySynapseIdsRequest( + connectomeFile: String, + synapseIds: List[Long] +) + +object BySynapseIdsRequest { + implicit val jsonFormat: OFormat[BySynapseIdsRequest] = Json.format[BySynapseIdsRequest] +} + +case class DirectedSynapseList( + in: Seq[Long], + out: Seq[Long] +) + +object DirectedSynapseList { + implicit val jsonFormat: OFormat[DirectedSynapseList] = Json.format[DirectedSynapseList] +} + +case class DirectedSynapseListMutable( + in: ListBuffer[Long], + out: ListBuffer[Long] +) { + def freeze: DirectedSynapseList = DirectedSynapseList(in.toList, out.toList) +} + +object DirectedSynapseListMutable { + def empty: DirectedSynapseListMutable = DirectedSynapseListMutable(ListBuffer(), ListBuffer()) +} + +case class SynapseTypesWithLegend( + synapseTypes: Seq[Long], + typeToString: Seq[String], +) + +object SynapseTypesWithLegend { + implicit val jsonFormat: OFormat[SynapseTypesWithLegend] = Json.format[SynapseTypesWithLegend] +} + +case class ConnectomeFileNameWithMappingName( + connectomeFileName: String, + mappingName: String +) + +object ConnectomeFileNameWithMappingName { + implicit val jsonFormat: OFormat[ConnectomeFileNameWithMappingName] = Json.format[ConnectomeFileNameWithMappingName] +} + +case class ConnectomeFileKey(dataSourceId: DataSourceId, layerName: String, attachment: LayerAttachment) + +class ConnectomeFileService @Inject()(config: DataStoreConfig, + remoteSourceDescriptorService: RemoteSourceDescriptorService, + hdf5ConnectomeFileService: Hdf5ConnectomeFileService, + zarrConnectomeFileService: ZarrConnectomeFileService) + extends FoxImplicits + with LazyLogging { + + private val dataBaseDir = Paths.get(config.Datastore.baseDirectory) + private val localConnectomesDir = "connectomes" + private val hdf5ConnectomeFileExtension = "hdf5" + + private val connectomeFileKeyCache + : AlfuCache[(DataSourceId, String, String), ConnectomeFileKey] = AlfuCache() // dataSourceId, layerName, connectomeFileName → ConnectomeFileKey + + def lookUpConnectomeFileKey(dataSourceId: DataSourceId, dataLayer: DataLayer, connectomeFileName: String)( + implicit ec: ExecutionContext): Fox[ConnectomeFileKey] = + connectomeFileKeyCache.getOrLoad( + (dataSourceId, dataLayer.name, connectomeFileName), + _ => lookUpConnectomeFileKeyImpl(dataSourceId, dataLayer, connectomeFileName).toFox) + + private def lookUpConnectomeFileKeyImpl(dataSourceId: DataSourceId, + dataLayer: DataLayer, + connectomeFileName: String): Box[ConnectomeFileKey] = { + val registeredAttachment: Option[LayerAttachment] = dataLayer.attachments match { + case Some(attachments) => attachments.connectomes.find(_.name == connectomeFileName) + case None => None + } + val localDatasetDir = dataBaseDir.resolve(dataSourceId.organizationId).resolve(dataSourceId.directoryName) + for { + registeredAttachmentNormalized <- tryo(registeredAttachment.map { attachment => + attachment.copy( + path = + remoteSourceDescriptorService.uriFromPathLiteral(attachment.path.toString, localDatasetDir, dataLayer.name)) + }) + localFallbackAttachment = LayerAttachment( + connectomeFileName, + localDatasetDir + .resolve(dataLayer.name) + .resolve(localConnectomesDir) + .resolve(connectomeFileName + "." + hdf5ConnectomeFileExtension) + .toUri, + LayerAttachmentDataformat.hdf5 + ) + selectedAttachment = registeredAttachmentNormalized.getOrElse(localFallbackAttachment) + } yield + ConnectomeFileKey( + dataSourceId, + dataLayer.name, + selectedAttachment + ) + } + + def listConnectomeFiles(dataSourceId: DataSourceId, dataLayer: DataLayer)( + implicit ec: ExecutionContext, + tc: TokenContext, + m: MessagesProvider): Fox[List[ConnectomeFileNameWithMappingName]] = { + val attachedConnectomeFileNames = dataLayer.attachments.map(_.connectomes).getOrElse(Seq.empty).map(_.name).toSet + + val layerDir = + dataBaseDir.resolve(dataSourceId.organizationId).resolve(dataSourceId.directoryName).resolve(dataLayer.name) + val scannedConnectomeFileNames = PathUtils + .listFiles(layerDir.resolve(localConnectomesDir), + silent = true, + PathUtils.fileExtensionFilter(hdf5ConnectomeFileExtension)) + .map { paths => + paths.map(path => FilenameUtils.removeExtension(path.getFileName.toString)) + } + .toOption + .getOrElse(Nil) + .toSet + + val allConnectomeFileNames = attachedConnectomeFileNames ++ scannedConnectomeFileNames + + Fox.fromFuture( + Fox + .serialSequence(allConnectomeFileNames.toSeq) { connectomeFileName => + for { + connectomeFileKey <- lookUpConnectomeFileKey(dataSourceId, dataLayer, connectomeFileName) ?~> Messages( + "connectome.file.lookup.failed", + connectomeFileName) + mappingName <- mappingNameForConnectomeFile(connectomeFileKey) ?~> Messages( + "connectome.file.readMappingName.failed", + connectomeFileName) + } yield ConnectomeFileNameWithMappingName(connectomeFileName, mappingName) + } + // Only return successes, we don’t want a malformed file breaking the list request. + .map(_.flatten)) + } + + private def mappingNameForConnectomeFile(connectomeFileKey: ConnectomeFileKey)(implicit ec: ExecutionContext, + tc: TokenContext): Fox[String] = + connectomeFileKey.attachment.dataFormat match { + case LayerAttachmentDataformat.zarr3 => zarrConnectomeFileService.mappingNameForConnectomeFile(connectomeFileKey) + case LayerAttachmentDataformat.hdf5 => hdf5ConnectomeFileService.mappingNameForConnectomeFile(connectomeFileKey) + case _ => unsupportedDataFormat(connectomeFileKey) + } + + def synapsesForAgglomerates(connectomeFileKey: ConnectomeFileKey, agglomerateIds: Seq[Long])( + implicit ec: ExecutionContext, + tc: TokenContext): Fox[Seq[DirectedSynapseList]] = + if (agglomerateIds.length == 1) { + for { + agglomerateId <- agglomerateIds.headOption.toFox ?~> "Failed to extract the single agglomerate ID from request" + inSynapses <- ingoingSynapsesForAgglomerate(connectomeFileKey, agglomerateId) ?~> "Failed to read ingoing synapses" + outSynapses <- outgoingSynapsesForAgglomerate(connectomeFileKey, agglomerateId) ?~> "Failed to read outgoing synapses" + } yield List(DirectedSynapseList(inSynapses, outSynapses)) + } else { + val agglomeratePairs = directedPairs(agglomerateIds.toSet.toSeq) + for { + synapsesPerPair <- Fox.serialCombined(agglomeratePairs)(pair => + synapseIdsForDirectedPair(connectomeFileKey, pair._1, pair._2)) + synapseListsMap = gatherPairSynapseLists(agglomerateIds, agglomeratePairs, synapsesPerPair) + synapseListsOrdered = agglomerateIds.map(id => synapseListsMap(id)) + } yield synapseListsOrdered + } + + private def directedPairs(items: Seq[Long]): Seq[(Long, Long)] = + (for { x <- items; y <- items } yield (x, y)).filter(pair => pair._1 != pair._2) + + private def gatherPairSynapseLists(agglomerateIds: Seq[Long], + agglomeratePairs: Seq[(Long, Long)], + synapsesPerPair: List[Seq[Long]]): collection.Map[Long, DirectedSynapseList] = { + val directedSynapseListsMutable = scala.collection.mutable.Map[Long, DirectedSynapseListMutable]() + agglomerateIds.foreach { agglomerateId => + directedSynapseListsMutable(agglomerateId) = DirectedSynapseListMutable.empty + } + agglomeratePairs.zip(synapsesPerPair).foreach { pairWithSynapses: ((Long, Long), Seq[Long]) => + val srcAgglomerate = pairWithSynapses._1._1 + val dstAgglomerate = pairWithSynapses._1._2 + directedSynapseListsMutable(srcAgglomerate).out ++= pairWithSynapses._2 + directedSynapseListsMutable(dstAgglomerate).in ++= pairWithSynapses._2 + } + directedSynapseListsMutable.view.mapValues(_.freeze).toMap + } + + private def ingoingSynapsesForAgglomerate(connectomeFileKey: ConnectomeFileKey, agglomerateId: Long)( + implicit ec: ExecutionContext, + tc: TokenContext): Fox[Seq[Long]] = + connectomeFileKey.attachment.dataFormat match { + case LayerAttachmentDataformat.zarr3 => + zarrConnectomeFileService.ingoingSynapsesForAgglomerate(connectomeFileKey, agglomerateId) + case LayerAttachmentDataformat.hdf5 => + hdf5ConnectomeFileService.ingoingSynapsesForAgglomerate(connectomeFileKey, agglomerateId) + case _ => unsupportedDataFormat(connectomeFileKey) + } + + private def outgoingSynapsesForAgglomerate(connectomeFileKey: ConnectomeFileKey, agglomerateId: Long)( + implicit ec: ExecutionContext, + tc: TokenContext): Fox[Seq[Long]] = + connectomeFileKey.attachment.dataFormat match { + case LayerAttachmentDataformat.zarr3 => + zarrConnectomeFileService.outgoingSynapsesForAgglomerate(connectomeFileKey, agglomerateId) + case LayerAttachmentDataformat.hdf5 => + hdf5ConnectomeFileService.outgoingSynapsesForAgglomerate(connectomeFileKey, agglomerateId) + case _ => unsupportedDataFormat(connectomeFileKey) + } + + private def synapseIdsForDirectedPair( + connectomeFileKey: ConnectomeFileKey, + srcAgglomerateId: Long, + dstAgglomerateId: Long)(implicit ec: ExecutionContext, tc: TokenContext): Fox[Seq[Long]] = + connectomeFileKey.attachment.dataFormat match { + case LayerAttachmentDataformat.zarr3 => + zarrConnectomeFileService.synapseIdsForDirectedPair(connectomeFileKey, srcAgglomerateId, dstAgglomerateId) + case LayerAttachmentDataformat.hdf5 => + hdf5ConnectomeFileService.synapseIdsForDirectedPair(connectomeFileKey, srcAgglomerateId, dstAgglomerateId) + case _ => unsupportedDataFormat(connectomeFileKey) + } + + def synapticPartnerForSynapses( + connectomeFileKey: ConnectomeFileKey, + synapseIds: List[Long], + direction: SynapticPartnerDirection)(implicit ec: ExecutionContext, tc: TokenContext): Fox[List[Long]] = + connectomeFileKey.attachment.dataFormat match { + case LayerAttachmentDataformat.zarr3 => + zarrConnectomeFileService.synapticPartnerForSynapses(connectomeFileKey, synapseIds, direction) + case LayerAttachmentDataformat.hdf5 => + hdf5ConnectomeFileService.synapticPartnerForSynapses(connectomeFileKey, synapseIds, direction) + case _ => unsupportedDataFormat(connectomeFileKey) + } + + def positionsForSynapses(connectomeFileKey: ConnectomeFileKey, synapseIds: List[Long])( + implicit ec: ExecutionContext, + tc: TokenContext): Fox[Seq[Seq[Long]]] = + connectomeFileKey.attachment.dataFormat match { + case LayerAttachmentDataformat.zarr3 => + zarrConnectomeFileService.positionsForSynapses(connectomeFileKey, synapseIds) + case LayerAttachmentDataformat.hdf5 => + hdf5ConnectomeFileService.positionsForSynapses(connectomeFileKey, synapseIds) + case _ => unsupportedDataFormat(connectomeFileKey) + } + + def typesForSynapses(connectomeFileKey: ConnectomeFileKey, synapseIds: List[Long])( + implicit ec: ExecutionContext, + tc: TokenContext): Fox[SynapseTypesWithLegend] = + connectomeFileKey.attachment.dataFormat match { + case LayerAttachmentDataformat.zarr3 => zarrConnectomeFileService.typesForSynapses(connectomeFileKey, synapseIds) + case LayerAttachmentDataformat.hdf5 => hdf5ConnectomeFileService.typesForSynapses(connectomeFileKey, synapseIds) + case _ => unsupportedDataFormat(connectomeFileKey) + } + + def clearCache(dataSourceId: DataSourceId, layerNameOpt: Option[String]): Int = { + connectomeFileKeyCache.clear { + case (keyDataSourceId, keyLayerName, _) => + dataSourceId == keyDataSourceId && layerNameOpt.forall(_ == keyLayerName) + } + + val clearedHdf5Count = hdf5ConnectomeFileService.clearCache(dataSourceId, layerNameOpt) + + val clearedZarrCount = zarrConnectomeFileService.clearCache(dataSourceId, layerNameOpt) + + clearedHdf5Count + clearedZarrCount + } + + private def unsupportedDataFormat(connectomeFileKey: ConnectomeFileKey)(implicit ec: ExecutionContext) = + Fox.failure( + s"Trying to load connectome file with unsupported data format ${connectomeFileKey.attachment.dataFormat}") + +} diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/connectome/ConnectomeFileUtils.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/connectome/ConnectomeFileUtils.scala new file mode 100644 index 00000000000..51bf2a8315c --- /dev/null +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/connectome/ConnectomeFileUtils.scala @@ -0,0 +1,25 @@ +package com.scalableminds.webknossos.datastore.services.connectome + +import com.scalableminds.webknossos.datastore.services.connectome.SynapticPartnerDirection.SynapticPartnerDirection + +trait ConnectomeFileUtils { + + protected val keyCsrIndptr = "CSR_indptr" + protected val keyCscIndptr = "CSC_indptr" + protected val keyCsrIndices = "CSR_indices" + protected val keyAgglomeratePairOffsets = "agglomerate_pair_offsets" + protected val keyCscAgglomeratePair = "CSC_agglomerate_pair" + protected val keySynapseTypes = "synapse_types" + protected val keySynapsePositions = "synapse_positions" + protected val keySynapseToSrcAgglomerate = "synapse_to_src_agglomerate" + protected val keySynapseToDstAgglomerate = "synapse_to_dst_agglomerate" + + protected val attrKeyMetadataMappingName = "metadata/mapping_name" + protected val attrKeySynapseTypeNames = "synapse_type_names" + + protected def synapticPartnerKey(direction: SynapticPartnerDirection): String = + direction match { + case SynapticPartnerDirection.src => keySynapseToSrcAgglomerate + case SynapticPartnerDirection.dst => keySynapseToDstAgglomerate + } +} diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/connectome/Hdf5ConnectomeFileService.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/connectome/Hdf5ConnectomeFileService.scala new file mode 100644 index 00000000000..cc3d5b6402c --- /dev/null +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/connectome/Hdf5ConnectomeFileService.scala @@ -0,0 +1,179 @@ +package com.scalableminds.webknossos.datastore.services.connectome + +import com.scalableminds.util.collections.SequenceUtils +import com.scalableminds.util.tools.Box.tryo +import com.scalableminds.util.tools.{Fox, FoxImplicits} +import com.scalableminds.webknossos.datastore.models.datasource.DataSourceId +import com.scalableminds.webknossos.datastore.services.connectome.SynapticPartnerDirection.SynapticPartnerDirection +import com.scalableminds.webknossos.datastore.storage.{CachedHdf5File, Hdf5FileCache} +import com.scalableminds.webknossos.datastore.DataStoreConfig + +import java.nio.file.Paths +import javax.inject.Inject +import scala.concurrent.ExecutionContext + +class Hdf5ConnectomeFileService @Inject()(config: DataStoreConfig) extends FoxImplicits with ConnectomeFileUtils { + + private val dataBaseDir = Paths.get(config.Datastore.baseDirectory) + + private lazy val fileHandleCache = new Hdf5FileCache(30) + + // Cannot read type names from the hdf5 file due to a limitation in jhdf5. + // However, all existing hdf5 connectome files have this exact type name set. + // Also compare https://scm.slack.com/archives/C5AKLAV0B/p1750852209211939?thread_ts=1705502230.128199&cid=C5AKLAV0B + private lazy val legacySynapseTypeNames = List("dendritic-shaft-synapse", "spine-head-synapse", "soma-synapse") + + def mappingNameForConnectomeFile(connectomeFileKey: ConnectomeFileKey)(implicit ec: ExecutionContext): Fox[String] = + for { + cachedConnectomeFile <- fileHandleCache + .getCachedHdf5File(connectomeFileKey.attachment)(CachedHdf5File.fromPath) + .toFox ?~> "connectome.file.open.failed" + mappingName <- finishAccessOnFailure(cachedConnectomeFile) { + cachedConnectomeFile.stringReader.getAttr("/", attrKeyMetadataMappingName) + } ?~> "connectome.file.readEncoding.failed" + _ = cachedConnectomeFile.finishAccess() + } yield mappingName + + def ingoingSynapsesForAgglomerate(connectomeFileKey: ConnectomeFileKey, agglomerateId: Long)( + implicit ec: ExecutionContext): Fox[Seq[Long]] = + for { + cachedConnectomeFile <- fileHandleCache + .getCachedHdf5File(connectomeFileKey.attachment)(CachedHdf5File.fromPath) + .toFox ?~> "connectome.file.open.failed" + fromAndToPtr: Array[Long] <- finishAccessOnFailure(cachedConnectomeFile) { + cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset(keyCscIndptr, 2, agglomerateId) + } ?~> "Could not read offsets from connectome file" + fromPtr <- fromAndToPtr.lift(0).toFox ?~> "Could not read start offset from connectome file" + toPtr <- fromAndToPtr.lift(1).toFox ?~> "Could not read end offset from connectome file" + _ <- Fox.fromBool(fromPtr <= toPtr) ?~> s"Agglomerate $agglomerateId not present in agglomerate file" + // readArrayBlockWithOffset has a bug and does not return the empty array when block size 0 is passed, hence the if. + agglomeratePairs: Array[Long] <- if (toPtr - fromPtr == 0L) Fox.successful(Array.empty[Long]) + else + finishAccessOnFailure(cachedConnectomeFile) { + cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset(keyCscAgglomeratePair, + (toPtr - fromPtr).toInt, + fromPtr) + } ?~> "Could not read agglomerate pairs from connectome file" + synapseIdsNested <- Fox.serialCombined(agglomeratePairs.toList) { agglomeratePair: Long => + for { + from <- finishAccessOnFailure(cachedConnectomeFile) { + cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset(keyAgglomeratePairOffsets, 1, agglomeratePair) + }.flatMap(_.headOption.toFox) + to <- finishAccessOnFailure(cachedConnectomeFile) { + cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset(keyAgglomeratePairOffsets, + 1, + agglomeratePair + 1) + }.flatMap(_.headOption.toFox) + } yield List.range(from, to) + } ?~> "Could not read ingoing synapses from connectome file" + _ = cachedConnectomeFile.finishAccess() + } yield synapseIdsNested.flatten + + def outgoingSynapsesForAgglomerate(connectomeFileKey: ConnectomeFileKey, agglomerateId: Long)( + implicit ec: ExecutionContext): Fox[Seq[Long]] = + for { + cachedConnectomeFile <- fileHandleCache + .getCachedHdf5File(connectomeFileKey.attachment)(CachedHdf5File.fromPath) + .toFox ?~> "connectome.file.open.failed" + fromAndToPtr: Array[Long] <- finishAccessOnFailure(cachedConnectomeFile) { + cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset(keyCsrIndptr, 2, agglomerateId) + } ?~> "Could not read offsets from connectome file" + fromPtr <- fromAndToPtr.lift(0).toFox ?~> "Could not read start offset from connectome file" + toPtr <- fromAndToPtr.lift(1).toFox ?~> "Could not read end offset from connectome file" + from <- finishAccessOnFailure(cachedConnectomeFile) { + cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset(keyAgglomeratePairOffsets, 1, fromPtr) + }.flatMap(_.headOption.toFox) ?~> "Could not read synapses from connectome file" + to <- finishAccessOnFailure(cachedConnectomeFile) { + cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset(keyAgglomeratePairOffsets, 1, toPtr) + }.flatMap(_.headOption.toFox) ?~> "Could not read synapses from connectome file" + _ = cachedConnectomeFile.finishAccess() + } yield Seq.range(from, to) + + def synapticPartnerForSynapses(connectomeFileKey: ConnectomeFileKey, + synapseIds: List[Long], + direction: SynapticPartnerDirection)(implicit ec: ExecutionContext): Fox[List[Long]] = + for { + cachedConnectomeFile <- fileHandleCache + .getCachedHdf5File(connectomeFileKey.attachment)(CachedHdf5File.fromPath) + .toFox ?~> "connectome.file.open.failed" + agglomerateIds <- Fox.serialCombined(synapseIds) { synapseId: Long => + finishAccessOnFailure(cachedConnectomeFile) { + cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset(synapticPartnerKey(direction), 1, synapseId) + }.flatMap(_.headOption.toFox) + } + _ = cachedConnectomeFile.finishAccess() + } yield agglomerateIds + + def positionsForSynapses(connectomeFileKey: ConnectomeFileKey, synapseIds: List[Long])( + implicit ec: ExecutionContext): Fox[List[List[Long]]] = + for { + cachedConnectomeFile <- fileHandleCache + .getCachedHdf5File(connectomeFileKey.attachment)(CachedHdf5File.fromPath) + .toFox ?~> "connectome.file.open.failed" + synapsePositions <- Fox.serialCombined(synapseIds) { synapseId: Long => + finishAccessOnFailure(cachedConnectomeFile) { + cachedConnectomeFile.uint64Reader.readMatrixBlockWithOffset(keySynapsePositions, 1, 3, synapseId, 0) + }.flatMap(_.headOption.toFox) + } + _ = cachedConnectomeFile.finishAccess() + } yield synapsePositions.map(_.toList) + + def typesForSynapses(connectomeFileKey: ConnectomeFileKey, synapseIds: List[Long])( + implicit ec: ExecutionContext): Fox[SynapseTypesWithLegend] = + for { + cachedConnectomeFile <- fileHandleCache + .getCachedHdf5File(connectomeFileKey.attachment)(CachedHdf5File.fromPath) + .toFox ?~> "connectome.file.open.failed" + // Hard coded type name list, as all legacy files have this value. + synapseTypes <- Fox.serialCombined(synapseIds) { synapseId: Long => + finishAccessOnFailure(cachedConnectomeFile) { + cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset(keySynapseTypes, 1, synapseId) + }.flatMap(_.headOption.toFox) + } + _ = cachedConnectomeFile.finishAccess() + } yield SynapseTypesWithLegend(synapseTypes, legacySynapseTypeNames) + + def synapseIdsForDirectedPair(connectomeFileKey: ConnectomeFileKey, srcAgglomerateId: Long, dstAgglomerateId: Long)( + implicit ec: ExecutionContext): Fox[Seq[Long]] = + for { + cachedConnectomeFile <- fileHandleCache + .getCachedHdf5File(connectomeFileKey.attachment)(CachedHdf5File.fromPath) + .toFox ?~> "connectome.file.open.failed" + fromAndToPtr: Array[Long] <- finishAccessOnFailure(cachedConnectomeFile) { + cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset(keyCsrIndptr, 2, srcAgglomerateId) + } ?~> "Could not read offsets from connectome file" + fromPtr <- fromAndToPtr.lift(0).toFox ?~> "Could not read start offset from connectome file" + toPtr <- fromAndToPtr.lift(1).toFox ?~> "Could not read end offset from connectome file" + columnValues: Array[Long] <- if (toPtr - fromPtr == 0L) Fox.successful(Array.empty[Long]) + else + finishAccessOnFailure(cachedConnectomeFile) { + cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset(keyCsrIndices, (toPtr - fromPtr).toInt, fromPtr) + } ?~> "Could not read agglomerate pairs from connectome file" + columnOffset = SequenceUtils.searchSorted(columnValues, dstAgglomerateId) + pairIndex = fromPtr + columnOffset + synapses <- if ((columnOffset >= columnValues.length) || (columnValues(columnOffset) != dstAgglomerateId)) + Fox.successful(List.empty) + else + for { + fromAndTo <- finishAccessOnFailure(cachedConnectomeFile) { + cachedConnectomeFile.uint64Reader.readArrayBlockWithOffset(keyAgglomeratePairOffsets, 2, pairIndex) + } + from <- fromAndTo.lift(0).toFox + to <- fromAndTo.lift(1).toFox + } yield Seq.range(from, to) + _ = cachedConnectomeFile.finishAccess() + } yield synapses + + private def finishAccessOnFailure[T](f: CachedHdf5File)(block: => T)(implicit ec: ExecutionContext): Fox[T] = + tryo { _: Throwable => + f.finishAccess() + } { + block + }.toFox + + def clearCache(dataSourceId: DataSourceId, layerNameOpt: Option[String]): Int = { + val datasetPath = dataBaseDir.resolve(dataSourceId.organizationId).resolve(dataSourceId.directoryName) + val relevantPath = layerNameOpt.map(l => datasetPath.resolve(l)).getOrElse(datasetPath) + fileHandleCache.clear(key => key.startsWith(relevantPath.toString)) + } +} diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/connectome/SynapticPartnerDirection.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/connectome/SynapticPartnerDirection.scala new file mode 100644 index 00000000000..2696ec25f8d --- /dev/null +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/connectome/SynapticPartnerDirection.scala @@ -0,0 +1,9 @@ +package com.scalableminds.webknossos.datastore.services.connectome + +import com.scalableminds.util.enumeration.ExtendedEnumeration + +object SynapticPartnerDirection extends ExtendedEnumeration { + type SynapticPartnerDirection = Value + + val src, dst = Value +} diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/connectome/ZarrConnectomeFileService.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/connectome/ZarrConnectomeFileService.scala new file mode 100644 index 00000000000..c2eaf2ec6e0 --- /dev/null +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/connectome/ZarrConnectomeFileService.scala @@ -0,0 +1,204 @@ +package com.scalableminds.webknossos.datastore.services.connectome + +import com.scalableminds.util.accesscontext.TokenContext +import com.scalableminds.util.cache.AlfuCache +import com.scalableminds.util.collections.SequenceUtils +import com.scalableminds.util.tools.Box.tryo +import com.scalableminds.util.tools.{Fox, FoxImplicits, JsonHelper} +import com.scalableminds.webknossos.datastore.datareaders.DatasetArray +import com.scalableminds.webknossos.datastore.datareaders.zarr3.Zarr3Array +import com.scalableminds.webknossos.datastore.models.datasource.DataSourceId +import com.scalableminds.webknossos.datastore.services.{ChunkCacheService, VoxelyticsZarrArtifactUtils} +import com.scalableminds.webknossos.datastore.services.connectome.SynapticPartnerDirection.SynapticPartnerDirection +import com.scalableminds.webknossos.datastore.storage.RemoteSourceDescriptorService +import jakarta.inject.Inject +import play.api.libs.json.{JsResult, JsValue, Reads} + +import scala.concurrent.ExecutionContext + +case class ConnectomeFileAttributes( + formatVersion: Long, + mappingName: String, + synapseTypeNames: Seq[String] +) + +object ConnectomeFileAttributes extends VoxelyticsZarrArtifactUtils with ConnectomeFileUtils { + + implicit object ConnectomeFileAttributesZarr3GroupHeaderReads extends Reads[ConnectomeFileAttributes] { + override def reads(json: JsValue): JsResult[ConnectomeFileAttributes] = { + val connectomeFileAttrs = lookUpArtifactAttributes(json) + for { + formatVersion <- readArtifactSchemaVersion(json) + mappingName <- (connectomeFileAttrs \ attrKeyMetadataMappingName).validate[String] + synapseTypeNames <- (connectomeFileAttrs \ attrKeySynapseTypeNames).validate[Seq[String]] + } yield + ConnectomeFileAttributes( + formatVersion, + mappingName, + synapseTypeNames + ) + } + } +} + +class ZarrConnectomeFileService @Inject()(remoteSourceDescriptorService: RemoteSourceDescriptorService, + chunkCacheService: ChunkCacheService) + extends FoxImplicits + with ConnectomeFileUtils { + private lazy val openArraysCache = AlfuCache[(ConnectomeFileKey, String), DatasetArray]() + private lazy val attributesCache = AlfuCache[ConnectomeFileKey, ConnectomeFileAttributes]() + + private def readConnectomeFileAttributes(connectomeFileKey: ConnectomeFileKey)( + implicit ec: ExecutionContext, + tc: TokenContext): Fox[ConnectomeFileAttributes] = + attributesCache.getOrLoad( + connectomeFileKey, + _ => + for { + groupVaultPath <- remoteSourceDescriptorService.vaultPathFor(connectomeFileKey.attachment) + groupHeaderBytes <- (groupVaultPath / ConnectomeFileAttributes.FILENAME_ZARR_JSON) + .readBytes() ?~> "Could not read connectome file zarr group file." + connectomeFileAttributes <- JsonHelper + .parseAs[ConnectomeFileAttributes](groupHeaderBytes) + .toFox ?~> "Could not parse connectome file attributes from zarr group file." + } yield connectomeFileAttributes + ) + + def mappingNameForConnectomeFile(connectomeFileKey: ConnectomeFileKey)(implicit ec: ExecutionContext, + tc: TokenContext): Fox[String] = + for { + attributes <- readConnectomeFileAttributes(connectomeFileKey) + } yield attributes.mappingName + + def synapticPartnerForSynapses( + connectomeFileKey: ConnectomeFileKey, + synapseIds: List[Long], + direction: SynapticPartnerDirection)(implicit ec: ExecutionContext, tc: TokenContext): Fox[List[Long]] = + for { + synapseToPartnerAgglomerateArray <- openZarrArray(connectomeFileKey, synapticPartnerKey(direction)) + agglomerateIds <- Fox.serialCombined(synapseIds) { synapseId: Long => + for { + agglomerateIdMA <- synapseToPartnerAgglomerateArray.readAsMultiArray(offset = synapseId, shape = 1) + agglomerateId <- tryo(agglomerateIdMA.getLong(0)).toFox + } yield agglomerateId + } + } yield agglomerateIds + + def positionsForSynapses(connectomeFileKey: ConnectomeFileKey, synapseIds: List[Long])( + implicit ec: ExecutionContext, + tc: TokenContext): Fox[Seq[Seq[Long]]] = + for { + arraySynapsePositions <- openZarrArray(connectomeFileKey, keySynapsePositions) + synapsePositions <- Fox.serialCombined(synapseIds) { synapseId: Long => + for { + synapsePositionMA <- arraySynapsePositions.readAsMultiArray(offset = Array(synapseId, 0), shape = Array(1, 3)) + synapsePosition <- tryo( + Seq(synapsePositionMA.getLong(0), synapsePositionMA.getLong(1), synapsePositionMA.getLong(2))).toFox + } yield synapsePosition + } + } yield synapsePositions + + def typesForSynapses(connectomeFileKey: ConnectomeFileKey, synapseIds: List[Long])( + implicit ec: ExecutionContext, + tc: TokenContext): Fox[SynapseTypesWithLegend] = + for { + arraySynapseTypes <- openZarrArray(connectomeFileKey, keySynapseTypes) + attributes <- readConnectomeFileAttributes(connectomeFileKey) + synapseTypes <- Fox.serialCombined(synapseIds) { synapseId: Long => + for { + synapseTypeMA <- arraySynapseTypes.readAsMultiArray(offset = synapseId, shape = 1) + synapseType <- tryo(synapseTypeMA.getLong(0)).toFox + } yield synapseType + } + } yield SynapseTypesWithLegend(synapseTypes, attributes.synapseTypeNames) + + def ingoingSynapsesForAgglomerate(connectomeFileKey: ConnectomeFileKey, agglomerateId: Long)( + implicit ec: ExecutionContext, + tc: TokenContext): Fox[List[Long]] = + for { + (fromPtr, toPtr) <- getToAndFromPtr(connectomeFileKey, agglomerateId, keyCscIndptr) + agglomeratePairOffsetsArray <- openZarrArray(connectomeFileKey, keyAgglomeratePairOffsets) + cscAgglomeratePairArray <- openZarrArray(connectomeFileKey, keyCscAgglomeratePair) + _ <- Fox.fromBool(fromPtr <= toPtr) ?~> s"Agglomerate $agglomerateId not present in agglomerate file" + agglomeratePairsMA <- cscAgglomeratePairArray.readAsMultiArray(offset = fromPtr, shape = (toPtr - fromPtr).toInt) + agglomeratePairs <- tryo(agglomeratePairsMA.getStorage.asInstanceOf[Array[Long]]).toFox + synapseIdsNested <- Fox.serialCombined(agglomeratePairs.toList) { agglomeratePair: Long => + for { + fromTo <- agglomeratePairOffsetsArray.readAsMultiArray(offset = agglomeratePair, shape = 2) + from <- tryo(fromTo.getLong(0)).toFox ?~> "Could not read start offset from connectome file" + to <- tryo(fromTo.getLong(1)).toFox ?~> "Could not read end offset from connectome file" + } yield Seq.range(from, to) + } + } yield synapseIdsNested.flatten + + private def getToAndFromPtr(connectomeFileKey: ConnectomeFileKey, agglomerateId: Long, arrayKey: String)( + implicit ec: ExecutionContext, + tc: TokenContext): Fox[(Long, Long)] = + for { + csrIndptrArray <- openZarrArray(connectomeFileKey, arrayKey) + fromAndToPtr <- csrIndptrArray.readAsMultiArray(offset = agglomerateId, shape = 2) + fromPtr <- tryo(fromAndToPtr.getLong(0)).toFox + toPtr <- tryo(fromAndToPtr.getLong(1)).toFox + } yield (fromPtr, toPtr) + + def outgoingSynapsesForAgglomerate(connectomeFileKey: ConnectomeFileKey, agglomerateId: Long)( + implicit ec: ExecutionContext, + tc: TokenContext): Fox[Seq[Long]] = + for { + (fromPtr, toPtr) <- getToAndFromPtr(connectomeFileKey, agglomerateId, keyCsrIndptr) + agglomeratePairOffsetsArray <- openZarrArray(connectomeFileKey, keyAgglomeratePairOffsets) + fromMA <- agglomeratePairOffsetsArray.readAsMultiArray(offset = fromPtr, shape = 1) + from <- tryo(fromMA.getLong(0)).toFox + toMA <- agglomeratePairOffsetsArray.readAsMultiArray(offset = toPtr, shape = 1) + to <- tryo(toMA.getLong(0)).toFox + } yield Seq.range(from, to) + + def synapseIdsForDirectedPair(connectomeFileKey: ConnectomeFileKey, srcAgglomerateId: Long, dstAgglomerateId: Long)( + implicit ec: ExecutionContext, + tc: TokenContext): Fox[Seq[Long]] = + for { + csrIndicesArray <- openZarrArray(connectomeFileKey, keyCsrIndices) + (fromPtr, toPtr) <- getToAndFromPtr(connectomeFileKey, srcAgglomerateId, keyCsrIndptr) + columnValuesMA <- csrIndicesArray.readAsMultiArray(offset = fromPtr, shape = (toPtr - fromPtr).toInt) + columnValues: Array[Long] <- tryo(columnValuesMA.getStorage.asInstanceOf[Array[Long]]).toFox + columnOffset = SequenceUtils.searchSorted(columnValues, dstAgglomerateId) + pairIndex = fromPtr + columnOffset + synapses <- if ((columnOffset >= columnValues.length) || (columnValues(columnOffset) != dstAgglomerateId)) + Fox.successful(List.empty) + else + for { + agglomeratePairOffsetsArray <- openZarrArray(connectomeFileKey, keyAgglomeratePairOffsets) + fromAndTo <- agglomeratePairOffsetsArray.readAsMultiArray(offset = pairIndex, shape = 2) + from <- tryo(fromAndTo.getLong(0)).toFox + to <- tryo(fromAndTo.getLong(1)).toFox + } yield Seq.range(from, to) + } yield synapses + + private def openZarrArray(connectomeFileKey: ConnectomeFileKey, + zarrArrayName: String)(implicit ec: ExecutionContext, tc: TokenContext): Fox[DatasetArray] = + openArraysCache.getOrLoad( + (connectomeFileKey, zarrArrayName), + _ => + for { + groupVaultPath <- remoteSourceDescriptorService.vaultPathFor(connectomeFileKey.attachment) + zarrArray <- Zarr3Array.open(groupVaultPath / zarrArrayName, + DataSourceId("dummy", "unused"), + "layer", + None, + None, + None, + chunkCacheService.sharedChunkContentsCache) + } yield zarrArray + ) + + def clearCache(dataSourceId: DataSourceId, layerNameOpt: Option[String]): Int = { + attributesCache.clear { meshFileKey => + meshFileKey.dataSourceId == dataSourceId && layerNameOpt.forall(meshFileKey.layerName == _) + } + + openArraysCache.clear { + case (meshFileKey, _) => + meshFileKey.dataSourceId == dataSourceId && layerNameOpt.forall(meshFileKey.layerName == _) + } + } +} diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/mesh/MeshFileService.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/mesh/MeshFileService.scala index 2d8a9c044cb..66271cd548e 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/mesh/MeshFileService.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/mesh/MeshFileService.scala @@ -82,11 +82,11 @@ class MeshFileService @Inject()(config: DataStoreConfig, def lookUpMeshFileKey(dataSourceId: DataSourceId, dataLayer: DataLayer, meshFileName: String)( implicit ec: ExecutionContext): Fox[MeshFileKey] = meshFileKeyCache.getOrLoad((dataSourceId, dataLayer.name, meshFileName), - _ => lookUpMeshFileImpl(dataSourceId, dataLayer, meshFileName).toFox) + _ => lookUpMeshFileKeyImpl(dataSourceId, dataLayer, meshFileName).toFox) - private def lookUpMeshFileImpl(dataSourceId: DataSourceId, - dataLayer: DataLayer, - meshFileName: String): Box[MeshFileKey] = { + private def lookUpMeshFileKeyImpl(dataSourceId: DataSourceId, + dataLayer: DataLayer, + meshFileName: String): Box[MeshFileKey] = { val registeredAttachment: Option[LayerAttachment] = dataLayer.attachments match { case Some(attachments) => attachments.meshes.find(_.name == meshFileName) case None => None diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/mesh/ZarrMeshFileService.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/mesh/ZarrMeshFileService.scala index d8af9a88c1b..1c9474a19de 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/mesh/ZarrMeshFileService.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/mesh/ZarrMeshFileService.scala @@ -72,10 +72,11 @@ class ZarrMeshFileService @Inject()(chunkCacheService: ChunkCacheService, tc: TokenContext): Fox[MeshFileAttributes] = for { groupVaultPath <- remoteSourceDescriptorService.vaultPathFor(meshFileKey.attachment) - groupHeaderBytes <- (groupVaultPath / MeshFileAttributes.FILENAME_ZARR_JSON).readBytes() + groupHeaderBytes <- (groupVaultPath / MeshFileAttributes.FILENAME_ZARR_JSON) + .readBytes() ?~> "Could not read mesh file zarr group file" meshFileAttributes <- JsonHelper .parseAs[MeshFileAttributes](groupHeaderBytes) - .toFox ?~> "Could not parse meshFile attributes from zarr group file" + .toFox ?~> "Could not parse meshFile attributes from zarr group file." } yield meshFileAttributes private def readMeshFileAttributes(meshFileKey: MeshFileKey)(implicit ec: ExecutionContext, diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/segmentindex/ZarrSegmentIndexFileService.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/segmentindex/ZarrSegmentIndexFileService.scala index 15dc3c847b9..8852fbc19b3 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/segmentindex/ZarrSegmentIndexFileService.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/segmentindex/ZarrSegmentIndexFileService.scala @@ -67,10 +67,11 @@ class ZarrSegmentIndexFileService @Inject()(remoteSourceDescriptorService: Remot tc: TokenContext): Fox[SegmentIndexFileAttributes] = for { groupVaultPath <- remoteSourceDescriptorService.vaultPathFor(segmentIndexFileKey.attachment) - groupHeaderBytes <- (groupVaultPath / SegmentIndexFileAttributes.FILENAME_ZARR_JSON).readBytes() + groupHeaderBytes <- (groupVaultPath / SegmentIndexFileAttributes.FILENAME_ZARR_JSON) + .readBytes() ?~> "Could not read segment index file zarr group file" segmentIndexFileAttributes <- JsonHelper .parseAs[SegmentIndexFileAttributes](groupHeaderBytes) - .toFox ?~> "Could not parse segment index file attributes from zarr group file" + .toFox ?~> "Could not parse segment index file attributes from zarr group file." } yield segmentIndexFileAttributes def readSegmentIndex(segmentIndexFileKey: SegmentIndexFileKey, diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/storage/RemoteSourceDescriptorService.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/storage/RemoteSourceDescriptorService.scala index 758a8bbe41f..d3010132b42 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/storage/RemoteSourceDescriptorService.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/storage/RemoteSourceDescriptorService.scala @@ -77,8 +77,8 @@ class RemoteSourceDescriptorService @Inject()(dSRemoteWebknossosClient: DSRemote throw new Exception( s"Absolute path $localPath in local file system is not in path whitelist. Consider adding it to datastore.localDirectoryWhitelist") } else { // relative local path, resolve in dataset dir - val pathRelativeToDataset = localDatasetDir.resolve(localPath) - val pathRelativeToLayer = localDatasetDir.resolve(layerName).resolve(localPath) + val pathRelativeToDataset = localDatasetDir.resolve(localPath).normalize + val pathRelativeToLayer = localDatasetDir.resolve(layerName).resolve(localPath).normalize if (pathRelativeToDataset.toFile.exists) { pathRelativeToDataset.toUri } else {