-
Notifications
You must be signed in to change notification settings - Fork 557
WIP: Add JobManager
#4287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: series/3.x
Are you sure you want to change the base?
WIP: Add JobManager
#4287
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
/* | ||
* Copyright 2020-2025 Typelevel | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package cats.effect.std | ||
|
||
import cats.effect.kernel._ | ||
import cats.syntax.all._ | ||
|
||
/** | ||
* A `JobManager` allows you to launch `Jobs` in the background using a unique identifier. Then | ||
* you can use the identifier to query for the status of the job or cancel it. | ||
*/ | ||
trait JobManager[F[_], Id, S] { | ||
|
||
/** | ||
* Creates and launches the given `Job` in the background. If another Job with the same id was | ||
* already running, it will be cancelled before starting this one. | ||
*/ | ||
def startJob(id: Id, job: Resource[F, JobManager.Job[F, S]]): F[Unit] | ||
|
||
/** | ||
* Gets the status of the `Job` associated with the given `id`. If `id` doesn't exists or the | ||
* `Job` already finished then the returned value will be a `None`. | ||
*/ | ||
def getJobStatus(id: Id): F[Option[S]] | ||
Comment on lines
+37
to
+41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I somewhat dislike the idea that both bad id and already finished return in a Any ideas? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By fully controlling IDs, we could have |
||
|
||
/** | ||
* Signals cancellation of the `Job` associated with the given `id`, and waits for its | ||
* completion. | ||
*/ | ||
def cancelJob(id: Id): F[Unit] | ||
Comment on lines
+43
to
+47
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we provide a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would that be essentially There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Conceptually yes, but also, since we already have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, so I think my point is: if it's somehow better (performance, safety, whatever) than just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIK, it is safer since its life cycle is attached to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does "nothing", as a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then I guess as you said, it provides no value and users who don't want to wait on the cancellation can just |
||
} | ||
|
||
object JobManager { | ||
|
||
/** | ||
* Represents a job managed by a `JobManager`. | ||
*/ | ||
trait Job[F[_], S] { | ||
|
||
/** | ||
* Starts the logic of this `Job`. | ||
*/ | ||
def run: F[Unit] | ||
|
||
/** | ||
* Gets the status of this `Job`. | ||
*/ | ||
def getStatus: F[S] | ||
} | ||
Comment on lines
+55
to
+66
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Users would then implement this |
||
|
||
private final case class RunningJob[F[_], S]( | ||
status: F[S], | ||
cancel: F[Unit] | ||
) | ||
|
||
def apply[F[_], Id, S](implicit F: Concurrent[F]): Resource[F, JobManager[F, Id, S]] = | ||
for { | ||
supervisor <- Supervisor[F](await = true) | ||
jobsMap <- Resource.eval(MapRef[F, Id, RunningJob[F, S]]) | ||
} yield new JobManager[F, Id, S] { | ||
override def startJob(id: Id, jobR: Resource[F, Job[F, S]]): F[Unit] = { | ||
val runJob = jobR.use { job => | ||
supervisor.supervise(job.run).flatMap { fiber => | ||
jobsMap(id) | ||
.getAndSet( | ||
RunningJob( | ||
status = job.getStatus, | ||
cancel = fiber.cancel | ||
).some | ||
) | ||
.flatMap(_.traverse_(_.cancel)) >> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In case the same |
||
fiber.join | ||
} | ||
} | ||
|
||
supervisor.supervise(runJob).void | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't wait for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think that's okay. I think the point of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair point. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I'm a little confused by why it's sent to the Supervisor... (I'm sure there is a reason, I just don't know it). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The main thing is that if the same There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, looking at the code, I realized that we also need to run the job setup ( Thus, I decided to use a |
||
} | ||
|
||
override def getJobStatus(id: Id): F[Option[S]] = | ||
jobsMap(id).get.flatMap(_.traverse(_.status)) | ||
|
||
override def cancelJob(id: Id): F[Unit] = | ||
jobsMap(id).getAndSet(None).flatMap(_.traverse_(_.cancel)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On one had, I like the idea of users being able to use their own
Ids
.On the other, I think most users would benefit from a default using automatically generated
UUIDs
.Should we provide such default in some way?