Skip to content

feat: schedule支持最少任务数路由 #229 #232

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ enum class RouteStrategyEnum(
RANDOM(1, "随机"),
ROUND(2, "轮训"),
CONSISTENT(3, "一致性hash"),
LEAST_JOB(4,"最少任务数"),
SHARDING_BROADCAST(10, "分片广播");

override fun code() = code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.TRIGGER_CODE_ID
import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.TRIGGER_CODE_IDX_DEF
import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.TRIGGER_TIME_IDX
import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.TRIGGER_TIME_IDX_DEF
import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.WORKER_ADDRESS_IDX
import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.WORKER_ADDRESS_IDX_DEF
import org.springframework.data.mongodb.core.index.CompoundIndex
import org.springframework.data.mongodb.core.index.CompoundIndexes
import org.springframework.data.mongodb.core.mapping.Document
Expand All @@ -25,6 +27,7 @@ import java.time.LocalDateTime
CompoundIndex(name = TRIGGER_CODE_IDX, def = TRIGGER_CODE_IDX_DEF, background = true),
CompoundIndex(name = EXECUTION_CODE_IDX, def = EXECUTION_CODE_IDX_DEF, background = true),
CompoundIndex(name = ALARM_STATUS_IDX, def = ALARM_STATUS_IDX_DEF, background = true),
CompoundIndex(name = WORKER_ADDRESS_IDX, def = WORKER_ADDRESS_IDX_DEF, background = true),
)
data class TJobLog(

Expand Down Expand Up @@ -85,5 +88,7 @@ data class TJobLog(
const val EXECUTION_CODE_IDX_DEF = "{'executionCode': 1}"
const val ALARM_STATUS_IDX = "alarmStatus_idx"
const val ALARM_STATUS_IDX_DEF = "{'alarmStatus': 1}"
const val WORKER_ADDRESS_IDX = "workerAddress_idx"
const val WORKER_ADDRESS_IDX_DEF = "{'workerAddress': 1}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,5 +170,11 @@ class MongoJobProvider(
return mongoTemplate.updateFirst(query, update, TJobLog::class.java).modifiedCount.toInt()
}

override fun countByWorkerAddress(executionCode: Int, workerAddress: String): Int {
val criteria = where(TJobLog::executionCode).`is`(executionCode).and(TJobLog::workerAddress).`is`(workerAddress)
val query = Query.query(criteria)
return mongoTemplate.count(query,"job_log").toInt()
}

data class IdEntity(val id: String)
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,12 @@ interface JobProvider {
* @param jobId 任务id
*/
fun deleteLogByJobId(jobId: String)

/**
* 查询worker上的任务数
* @param executionCode 执行结果码
* @param workerAddress worker地址
* @return 对应状态的任务数量
* */
fun countByWorkerAddress(executionCode: Int, workerAddress:String): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.tencent.devops.schedule.manager.WorkerManager
import com.tencent.devops.schedule.provider.JobProvider
import com.tencent.devops.schedule.provider.LockProvider
import com.tencent.devops.schedule.provider.WorkerProvider
import com.tencent.devops.schedule.router.Routes
import com.tencent.devops.schedule.scheduler.DefaultJobScheduler
import com.tencent.devops.schedule.scheduler.JobScheduler
import com.tencent.devops.schedule.scheduler.ScheduleServerMetricsListener
Expand All @@ -24,6 +25,7 @@ import org.springframework.context.annotation.Import
@Import(
ScheduleServerWebConfiguration::class,
ScheduleServerMetricsListener::class,
Routes::class
)
class ScheduleServerAutoConfiguration {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ open class DefaultJobManager(
return jobProvider.findFailJobLogIds(limit)
}

override fun countByWorkerAddress(executionCode: Int, workerAddress: String): Int {
return jobProvider.countByWorkerAddress(executionCode, workerAddress)
}

/**
* 验证调度参数
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,12 @@ interface JobManager {
* @param executionMessage 执行结果
*/
fun completeJob(logId: String, executionCode: Int, executionMessage: String?)

/**
* 查询worker上的任务数
* @param executionCode 执行结果码
* @param workerAddress worker地址
* @return 对应状态的任务数量
* */
fun countByWorkerAddress(executionCode: Int, workerAddress:String): Int
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,43 @@
package com.tencent.devops.schedule.router

import com.tencent.devops.schedule.enums.RouteStrategyEnum
import com.tencent.devops.schedule.manager.JobManager
import com.tencent.devops.schedule.pojo.trigger.TriggerParam

/**
* 路由策略工具类
*/
object Routes {
class Routes(jobManager: JobManager) {

private val strategyMap = mapOf(
RouteStrategyEnum.RANDOM to WorkerRouteRandom(),
RouteStrategyEnum.ROUND to WorkerRouteRound(),
RouteStrategyEnum.CONSISTENT to WorkerRouteConsistentHash()
)
init {
Companion.jobManager = jobManager
}

companion object {
private lateinit var jobManager: JobManager
private val strategyMap by lazy {
mapOf(
RouteStrategyEnum.RANDOM to WorkerRouteRandom(),
RouteStrategyEnum.ROUND to WorkerRouteRound(),
RouteStrategyEnum.CONSISTENT to WorkerRouteConsistentHash(),
RouteStrategyEnum.LEAST_JOB to WorkerRouteLeastJob(jobManager),
)
}

/**
* 任务路由
* @param strategy 路由策略
* @param triggerParam 任务触发参数
* @param addressList worker地址列表
*
* @return 路由地址,可能为空
*/
fun route(
strategy: RouteStrategyEnum,
triggerParam: TriggerParam,
addressList: List<String>
): String? {
return strategyMap[strategy]?.route(triggerParam, addressList)
/**
* 任务路由
* @param strategy 路由策略
* @param triggerParam 任务触发参数
* @param addressList worker地址列表
*
* @return 路由地址,可能为空
*/
fun route(
strategy: RouteStrategyEnum,
triggerParam: TriggerParam,
addressList: List<String>
): String? {
return strategyMap[strategy]?.route(triggerParam, addressList)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.tencent.devops.schedule.router

import com.tencent.devops.schedule.enums.ExecutionCodeEnum
import com.tencent.devops.schedule.manager.JobManager
import com.tencent.devops.schedule.pojo.trigger.TriggerParam

class WorkerRouteLeastJob(private val jobManager: JobManager) : WorkerRouter {
override fun route(triggerParam: TriggerParam, addressList: List<String>): String? {
var selected: String? = null
var minJobs = Int.MAX_VALUE
addressList.forEach {
val countByWorkerAddress = jobManager.countByWorkerAddress(ExecutionCodeEnum.RUNNING.code(), it)
if (countByWorkerAddress < minJobs) {
selected = it
minJobs = countByWorkerAddress
}
}
return selected
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ export default (safe) => {
searchRange: true,
search: true
},
{
label: '调度节点',
prop: 'workerAddress',
},
{
label: '调度结果',
prop: 'triggerCode',
Expand Down
Loading