news 2026/6/25 12:09:26

从零实现一个分布式调度器:任务分片与容错

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
从零实现一个分布式调度器:任务分片与容错

前言

你有没有想过:每天凌晨3点,系统是怎么自动执行数据同步、报表生成、缓存刷新这些定时任务的?如果任务执行失败了,系统会怎么处理?

分布式调度器是微服务架构中处理定时任务、异步任务的核心组件。

今天我们用C语言从零实现一个分布式调度器:

· 任务注册与发现
· Cron表达式解析
· 任务分片(并行执行)
· 故障转移(容错处理)
· 任务状态管理
· 监控告警

---

一、分布式调度器核心原理

1. 架构图

```
┌─────────────────────────────────────────────────────────────┐
│ 调度器集群 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 节点1 │ │ 节点2 │ │ 节点3 │ │
│ │ (Leader) │ │ (Follower) │ │ (Follower) │ │
│ └──────┬──────┘ └─────────────┘ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 任务分片策略 │ │
│ │ [0-33%] [33%-66%] [66%-100%] │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
```

2. 核心功能

功能 说明
任务注册 动态注册/注销任务
Cron调度 支持标准Cron表达式
任务分片 大任务拆分成小片并行
故障转移 节点宕机自动转移任务
任务状态 运行中/成功/失败/超时
可观测性 日志、指标、告警

---

二、完整代码实现

1. 基础数据结构

```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <errno.h>
#include <signal.h>

#define MAX_TASK_NAME 128
#define MAX_TASK_PARAM 256
#define MAX_NODES 100
#define MAX_SHARDS 100

// 任务状态
typedef enum {
TASK_IDLE = 0,
TASK_RUNNING,
TASK_SUCCESS,
TASK_FAILED,
TASK_TIMEOUT,
TASK_CANCELLED
} task_status_t;

// 任务分片
typedef struct task_shard {
int shard_index;
int total_shards;
char param[MAX_TASK_PARAM];
char node_id[32];
task_status_t status;
time_t start_time;
time_t end_time;
char error_msg[256];
} task_shard_t;

// 任务定义
typedef struct task_definition {
char name[MAX_TASK_NAME];
char cron_expr[64];
int shard_count;
int timeout_seconds;
int retry_count;
void (*execute)(task_shard_t *shard, void *context);
void *context;
struct task_definition *next;
} task_definition_t;

// 任务实例
typedef struct task_instance {
char task_name[MAX_TASK_NAME];
char instance_id[64];
task_shard_t *shards;
int shard_count;
int completed_shards;
task_status_t status;
time_t start_time;
time_t end_time;
struct task_instance *next;
} task_instance_t;

// 调度器节点
typedef struct scheduler_node {
char node_id[32];
char host[64];
int port;
int is_leader;
int is_healthy;
time_t last_heartbeat;
} scheduler_node_t;

// 分布式调度器
typedef struct distributed_scheduler {
char node_id[32];
task_definition_t *tasks;
task_instance_t *instances;
scheduler_node_t *nodes;
int node_count;
int running;
pthread_mutex_t mutex;
pthread_t schedule_thread;
pthread_t heartbeat_thread;
} distributed_scheduler_t;
```

2. Cron表达式解析

```c
// Cron字段
typedef struct cron_fields {
int minute[60];
int minute_count;
int hour[24];
int hour_count;
int day[31];
int day_count;
int month[12];
int month_count;
int weekday[7];
int weekday_count;
} cron_fields_t;

// 解析cron表达式(简化版)
int cron_parse(const char *expr, cron_fields_t *fields) {
// 格式: 分 时 日 月 周
// 示例: "0 0 3 * * *" 每天3点
// 简化实现:只支持 * 和数字
char parts[6][32];
int count = sscanf(expr, "%s %s %s %s %s",
parts[0], parts[1], parts[2], parts[3], parts[4]);
if (count != 5) return -1;

// 解析分钟
if (strcmp(parts[0], "*") == 0) {
fields->minute_count = 60;
for (int i = 0; i < 60; i++) fields->minute[i] = i;
} else {
fields->minute_count = 1;
fields->minute[0] = atoi(parts[0]);
}

// 解析小时
if (strcmp(parts[1], "*") == 0) {
fields->hour_count = 24;
for (int i = 0; i < 24; i++) fields->hour[i] = i;
} else {
fields->hour_count = 1;
fields->hour[0] = atoi(parts[1]);
}

// 解析日
if (strcmp(parts[2], "*") == 0) {
fields->day_count = 31;
for (int i = 1; i <= 31; i++) fields->day[i-1] = i;
} else {
fields->day_count = 1;
fields->day[0] = atoi(parts[2]);
}

// 解析月
if (strcmp(parts[3], "*") == 0) {
fields->month_count = 12;
for (int i = 1; i <= 12; i++) fields->month[i-1] = i;
} else {
fields->month_count = 1;
fields->month[0] = atoi(parts[3]);
}

// 解析周
if (strcmp(parts[4], "*") == 0) {
fields->weekday_count = 7;
for (int i = 0; i < 7; i++) fields->weekday[i] = i;
} else {
fields->weekday_count = 1;
fields->weekday[0] = atoi(parts[4]);
}

return 0;
}

// 检查时间是否匹配
int cron_match(cron_fields_t *fields, struct tm *tm) {
int match = 0;

// 检查分钟
for (int i = 0; i < fields->minute_count; i++) {
if (fields->minute[i] == tm->tm_min) { match = 1; break; }
}
if (!match) return 0;
match = 0;

// 检查小时
for (int i = 0; i < fields->hour_count; i++) {
if (fields->hour[i] == tm->tm_hour) { match = 1; break; }
}
if (!match) return 0;
match = 0;

// 检查日
for (int i = 0; i < fields->day_count; i++) {
if (fields->day[i] == tm->tm_mday) { match = 1; break; }
}
if (!match) return 0;
match = 0;

// 检查月
for (int i = 0; i < fields->month_count; i++) {
if (fields->month[i] == tm->tm_mon + 1) { match = 1; break; }
}
if (!match) return 0;
match = 0;

// 检查周
for (int i = 0; i < fields->weekday_count; i++) {
if (fields->weekday[i] == tm->tm_wday) { match = 1; break; }
}

return match;
}
```

3. 任务调度核心

```c
// 创建调度器
distributed_scheduler_t *scheduler_create(const char *node_id) {
distributed_scheduler_t *s = malloc(sizeof(distributed_scheduler_t));
strcpy(s->node_id, node_id);
s->tasks = NULL;
s->instances = NULL;
s->nodes = malloc(sizeof(scheduler_node_t) * MAX_NODES);
s->node_count = 0;
s->running = 1;
pthread_mutex_init(&s->mutex, NULL);

printf("调度器节点启动: %s\n", node_id);
return s;
}

// 注册任务
void scheduler_register_task(distributed_scheduler_t *s, const char *name,
const char *cron_expr, int shard_count,
int timeout_seconds, int retry_count,
void (*execute)(task_shard_t*, void*),
void *context) {
pthread_mutex_lock(&s->mutex);

task_definition_t *task = malloc(sizeof(task_definition_t));
strcpy(task->name, name);
strcpy(task->cron_expr, cron_expr);
task->shard_count = shard_count > 0 ? shard_count : 1;
task->timeout_seconds = timeout_seconds > 0 ? timeout_seconds : 300;
task->retry_count = retry_count > 0 ? retry_count : 3;
task->execute = execute;
task->context = context;
task->next = s->tasks;
s->tasks = task;

pthread_mutex_unlock(&s->mutex);
printf("[调度器] 注册任务: %s, cron=%s, shards=%d\n",
name, cron_expr, shard_count);
}

// 执行分片任务
void execute_shard_task(distributed_scheduler_t *s, task_definition_t *task,
task_instance_t *instance, int shard_idx) {
task_shard_t *shard = &instance->shards[shard_idx];
shard->shard_index = shard_idx;
shard->total_shards = task->shard_count;
strcpy(shard->node_id, s->node_id);
shard->status = TASK_RUNNING;
shard->start_time = time(NULL);

printf("[调度器] 执行分片 %d/%d: %s\n",
shard_idx+1, task->shard_count, task->name);

// 执行任务
if (task->execute) {
task->execute(shard, task->context);
shard->status = TASK_SUCCESS;
}

shard->end_time = time(NULL);

// 更新完成状态
pthread_mutex_lock(&s->mutex);
instance->completed_shards++;
if (instance->completed_shards >= task->shard_count) {
instance->status = TASK_SUCCESS;
instance->end_time = time(NULL);
}
pthread_mutex_unlock(&s->mutex);

printf("[调度器] 分片 %d/%d 完成: %s\n",
shard_idx+1, task->shard_count, task->name);
}

// 检查并触发任务
void scheduler_check_tasks(distributed_scheduler_t *s) {
time_t now = time(NULL);
struct tm *tm_now = localtime(&now);

pthread_mutex_lock(&s->mutex);

task_definition_t *task = s->tasks;
while (task) {
cron_fields_t fields;
if (cron_parse(task->cron_expr, &fields) == 0) {
if (cron_match(&fields, tm_now)) {
// 检查是否已有实例在运行
task_instance_t *inst = s->instances;
int running = 0;
while (inst) {
if (strcmp(inst->task_name, task->name) == 0 &&
inst->status == TASK_RUNNING) {
running = 1;
break;
}
inst = inst->next;
}

if (!running) {
// 创建新任务实例
task_instance_t *new_inst = malloc(sizeof(task_instance_t));
strcpy(new_inst->task_name, task->name);
snprintf(new_inst->instance_id, sizeof(new_inst->instance_id),
"%s-%ld", task->name, now);
new_inst->shard_count = task->shard_count;
new_inst->shards = malloc(sizeof(task_shard_t) * task->shard_count);
memset(new_inst->shards, 0, sizeof(task_shard_t) * task->shard_count);
new_inst->completed_shards = 0;
new_inst->status = TASK_RUNNING;
new_inst->start_time = now;
new_inst->end_time = 0;
new_inst->next = s->instances;
s->instances = new_inst;

printf("[调度器] 触发任务: %s, 分片数: %d\n",
task->name, task->shard_count);

// 执行所有分片
for (int i = 0; i < task->shard_count; i++) {
execute_shard_task(s, task, new_inst, i);
}
}
}
}
task = task->next;
}

pthread_mutex_unlock(&s->mutex);
}
```

4. 心跳与故障转移

```c
// 心跳检测
void *heartbeat_thread(void *arg) {
distributed_scheduler_t *s = (distributed_scheduler_t*)arg;

while (s->running) {
sleep(5);
// 这里会向其他节点发送心跳
// 简化实现:打印心跳日志
// printf("[心跳] %s 存活\n", s->node_id);
}
return NULL;
}

// 故障转移
void scheduler_failover(distributed_scheduler_t *s) {
pthread_mutex_lock(&s->mutex);

time_t now = time(NULL);

// 检查每个运行中的任务实例
task_instance_t *inst = s->instances;
while (inst) {
if (inst->status == TASK_RUNNING) {
// 检查是否有分片长时间未完成
int timeout_count = 0;
for (int i = 0; i < inst->shard_count; i++) {
if (inst->shards[i].status == TASK_RUNNING) {
time_t elapsed = now - inst->shards[i].start_time;
// 获取超时配置
task_definition_t *task = s->tasks;
int timeout = 300;
while (task) {
if (strcmp(task->name, inst->task_name) == 0) {
timeout = task->timeout_seconds;
break;
}
task = task->next;
}

if (elapsed > timeout) {
// 标记超时,需要重新调度
inst->shards[i].status = TASK_TIMEOUT;
strcpy(inst->shards[i].error_msg, "Task timeout, rescheduling");
timeout_count++;
}
}
}

if (timeout_count > 0) {
printf("[故障转移] 任务 %s 有 %d 个分片超时,重新调度\n",
inst->task_name, timeout_count);
// 实际会重新分配超时分片
}
}
inst = inst->next;
}

pthread_mutex_unlock(&s->mutex);
}
```

5. 示例任务

```c
// 示例任务1:数据同步
void data_sync_task(task_shard_t *shard, void *context) {
printf("[任务] 数据同步 分片 %d/%d: 同步数据范围 [%d, %d]\n",
shard->shard_index+1, shard->total_shards,
shard->shard_index * 1000 / shard->total_shards,
(shard->shard_index+1) * 1000 / shard->total_shards);
sleep(1); // 模拟执行
}

// 示例任务2:报表生成
void report_task(task_shard_t *shard, void *context) {
int *report_id = (int*)context;
printf("[任务] 报表生成 分片 %d/%d: 生成报表 %d 的切片 %d\n",
shard->shard_index+1, shard->total_shards, *report_id, shard->shard_index);
sleep(2); // 模拟执行
}

// 示例任务3:缓存刷新
void cache_refresh_task(task_shard_t *shard, void *context) {
printf("[任务] 缓存刷新 分片 %d/%d: 刷新缓存键前缀 %d\n",
shard->shard_index+1, shard->total_shards, shard->shard_index);
sleep(1); // 模拟执行
}
```

6. 测试代码

```c
int main() {
printf("=== 分布式调度器测试 ===\n\n");

// 创建调度器
distributed_scheduler_t *s = scheduler_create("node-001");

// 注册任务
scheduler_register_task(s, "data-sync", "0 0 3 * * *", 4, 300, 3,
data_sync_task, NULL);

int report_id = 1001;
scheduler_register_task(s, "report-gen", "0 30 2 * * *", 2, 600, 2,
report_task, &report_id);

scheduler_register_task(s, "cache-refresh", "*/10 * * * *", 3, 60, 5,
cache_refresh_task, NULL);

// 启动心跳线程
pthread_t heartbeat_tid;
pthread_create(&heartbeat_tid, NULL, heartbeat_thread, s);

// 主调度循环
printf("\n[调度器] 开始调度循环...\n");
int loop_count = 0;
while (s->running && loop_count < 20) {
scheduler_check_tasks(s);
scheduler_failover(s);
sleep(10);
loop_count++;

// 打印任务实例状态
printf("\n=== 任务实例状态 ===\n");
pthread_mutex_lock(&s->mutex);
task_instance_t *inst = s->instances;
while (inst) {
printf("任务: %s, 状态: %d, 完成: %d/%d\n",
inst->task_name, inst->status,
inst->completed_shards, inst->shard_count);
inst = inst->next;
}
pthread_mutex_unlock(&s->mutex);
}

s->running = 0;
pthread_join(heartbeat_tid, NULL);

return 0;
}
```

---

三、编译和运行

```bash
gcc -o scheduler scheduler.c -lpthread
./scheduler
```

---

四、调度器 vs 其他方案

特性 本实现 Quartz XXL-JOB
任务分片 ✅ ✅ ✅
故障转移 ✅ ✅ ✅
Cron调度 ✅ ✅ ✅
任务编排 ❌ ✅ ✅
可视化 ❌ ✅ ✅
依赖 无 JDBC MySQL

---

五、总结

通过这篇文章,你学会了:

· 分布式调度器的核心原理
· Cron表达式解析
· 任务分片与并行执行
· 故障转移机制
· 任务状态管理
· 心跳检测

分布式调度器是大数据平台和微服务架构的核心组件。掌握它,你就理解了数据同步、报表生成、定时清理等系统的设计原理。

下一篇预告:《从零实现一个服务注册中心:Eureka与Consul》

---

评论区分享一下你用调度器解决过什么场景~

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/25 12:08:22

科伦坡租房专家系统:规则驱动的本地化决策支持框架

1. 项目概述&#xff1a;这不是一个“找房App”&#xff0c;而是一套可落地的本地化决策支持骨架 在科伦坡做租房决策&#xff0c;远比在新加坡或东京复杂得多。我第一次帮朋友找房时&#xff0c;在Pettah转了三天&#xff0c;看了17套标着“全新装修”的公寓&#xff0c;结果有…

作者头像 李华
网站建设 2026/6/25 12:08:12

/dev/urandom和/dev/random这两个文件有什么区别

这两个文件都是 Linux 系统提供的用于生成随机数的伪设备文件,它们的核心区别在于读取时的阻塞行为和底层熵池的消耗机制。 我们可以从以下几个维度来深入对比: 1. 阻塞行为(最核心的区别) /dev/random(阻塞型):它依赖于系统真实的“环境噪声”(如键盘敲击、鼠标移动…

作者头像 李华
网站建设 2026/6/25 12:08:10

微服务拆分策略:从单体到分布式的服务边界划分与演进路径

微服务拆分策略&#xff1a;从单体到分布式的服务边界划分与演进路径 一、微服务拆分的两难&#xff1a;拆早了是过度设计&#xff0c;拆晚了是技术债 某电商平台从单体架构起步&#xff0c;初期一个工程包含用户、商品、订单、支付所有模块。随着团队扩张到 30 人&#xff0c;…

作者头像 李华
网站建设 2026/6/25 12:08:04

从环路展开到交织变分:攻克强关联玻色气体自由能计算难题

1. 从“一团乱麻”到“有序编织”&#xff1a;理解相互作用玻色气体的核心挑战在凝聚态物理和量子多体物理的研究中&#xff0c;相互作用玻色气体是一个经典而又充满活力的模型。它听起来可能很学术&#xff0c;但我们可以把它想象成一个微观世界里的“人群”。想象一下&#x…

作者头像 李华
网站建设 2026/6/25 12:08:00

如何在Mac上完美读写NTFS?免费开源解决方案来了!

如何在Mac上完美读写NTFS&#xff1f;免费开源解决方案来了&#xff01; 【免费下载链接】Free-NTFS-for-Mac Nigate: An open-source NTFS utility for Mac. It supports all Mac models (Intel and Apple Silicon), providing full read-write access, mounting, and managem…

作者头像 李华