news 2026/4/23 17:44:55

Kotlin协程flow缓冲buffer任务流,批次任务中选取优先级最高任务最先运行(一)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Kotlin协程flow缓冲buffer任务流,批次任务中选取优先级最高任务最先运行(一)

Kotlin协程flow缓冲buffer任务流,批次任务中选取优先级最高任务率先运行(一)

假设现在有一种场景,在一个任务接收器中,源源不断且不知道任务发送者何时会将新任务发送过来,每个任务都具备不同的任务优先级,任务无时无刻的进入任务缓冲池,目的是把任务缓冲池中优先级最高的那个任务挑选出来最先运行。

import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.newFixedThreadPoolContext import kotlinx.coroutines.runBlocking import java.util.UUID fun main() { val myThreadPool = newFixedThreadPoolContext(4, "my-thread") val bufferCapacity = 5 val totalTaskSize = 15 val channel = Channel<TaskInfo>() val taskList = mutableListOf<TaskInfo>() runBlocking { //接收任务 async { channel.receiveAsFlow() .buffer(bufferCapacity) .onEach { it -> //生产者 println("onEach $it at time=${System.currentTimeMillis()} ${Thread.currentThread().name}") taskList.add(it) }.flowOn(myThreadPool) .collect { it -> //消费者 println("collect $it at time=${System.currentTimeMillis()} ${Thread.currentThread().name}") val newOrderList = taskList.sortedBy { it.priority } newOrderList.forEach { print("${it.priority} ") } val lastTaskInfo = newOrderList.lastOrNull() println("\n最大优先级任务:$lastTaskInfo") taskList.remove(lastTaskInfo) loader(lastTaskInfo!!) } } //源源不断的密集发送加载任务。 async { repeat(totalTaskSize) { it -> enqueue(channel, it) } } } } private suspend fun enqueue(channel: Channel<TaskInfo>, id: Int) { val taskInfo = TaskInfo(id, (Math.random() * 9999).toInt()) println("enqueue $taskInfo") channel.send(taskInfo) } //假设这里是真正的耗时任务执行体 private suspend fun loader(info: TaskInfo) { println("load start $info @time=${System.currentTimeMillis()} ${Thread.currentThread().name}") delay(500) println("load end $info @time=${System.currentTimeMillis()} ${Thread.currentThread().name}") } private class TaskInfo { var id = 0 var priority = 0 private val taskId = UUID.randomUUID() constructor(id: Int, priority: Int) { this.id = id this.priority = priority } override fun equals(other: Any?): Boolean { return taskId == (other as TaskInfo).taskId } override fun toString(): String { return "TaskInfo(id=$id, priority=$priority)" } }

输出:

enqueue TaskInfo(id=0, priority=7947)
enqueue TaskInfo(id=1, priority=1045)
enqueue TaskInfo(id=2, priority=4478)
onEach TaskInfo(id=0, priority=7947) at time=1765979341859 my-thread-2
onEach TaskInfo(id=1, priority=1045) at time=1765979341859 my-thread-2
onEach TaskInfo(id=2, priority=4478) at time=1765979341859 my-thread-2
enqueue TaskInfo(id=3, priority=5964)
enqueue TaskInfo(id=4, priority=2658)
onEach TaskInfo(id=3, priority=5964) at time=1765979341859 my-thread-4
onEach TaskInfo(id=4, priority=2658) at time=1765979341859 my-thread-4
enqueue TaskInfo(id=5, priority=3495)
onEach TaskInfo(id=5, priority=3495) at time=1765979341860 my-thread-3
enqueue TaskInfo(id=6, priority=1461)
onEach TaskInfo(id=6, priority=1461) at time=1765979341860 my-thread-4
enqueue TaskInfo(id=7, priority=4860)
onEach TaskInfo(id=7, priority=4860) at time=1765979341860 my-thread-3
enqueue TaskInfo(id=8, priority=7226)
onEach TaskInfo(id=8, priority=7226) at time=1765979341860 my-thread-4
enqueue TaskInfo(id=9, priority=1939)
enqueue TaskInfo(id=10, priority=133)
onEach TaskInfo(id=9, priority=1939) at time=1765979341861 my-thread-3
onEach TaskInfo(id=10, priority=133) at time=1765979341861 my-thread-3
enqueue TaskInfo(id=11, priority=1818)
enqueue TaskInfo(id=12, priority=7695)
onEach TaskInfo(id=11, priority=1818) at time=1765979341861 my-thread-2
onEach TaskInfo(id=12, priority=7695) at time=1765979341861 my-thread-2
enqueue TaskInfo(id=13, priority=4365)
onEach TaskInfo(id=13, priority=4365) at time=1765979341862 my-thread-4
enqueue TaskInfo(id=14, priority=4889)
onEach TaskInfo(id=14, priority=4889) at time=1765979341862 my-thread-2
collect TaskInfo(id=0, priority=7947) at time=1765979341862 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964 7226 7695 7947
最大优先级任务:TaskInfo(id=0, priority=7947)
load start TaskInfo(id=0, priority=7947) @time=1765979341887 main
load end TaskInfo(id=0, priority=7947) @time=1765979342391 main
collect TaskInfo(id=1, priority=1045) at time=1765979342392 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964 7226 7695
最大优先级任务:TaskInfo(id=12, priority=7695)
load start TaskInfo(id=12, priority=7695) @time=1765979342392 main
load end TaskInfo(id=12, priority=7695) @time=1765979342901 main
collect TaskInfo(id=2, priority=4478) at time=1765979342901 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964 7226
最大优先级任务:TaskInfo(id=8, priority=7226)
load start TaskInfo(id=8, priority=7226) @time=1765979342902 main
load end TaskInfo(id=8, priority=7226) @time=1765979343412 main
collect TaskInfo(id=3, priority=5964) at time=1765979343412 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964
最大优先级任务:TaskInfo(id=3, priority=5964)
load start TaskInfo(id=3, priority=5964) @time=1765979343412 main
load end TaskInfo(id=3, priority=5964) @time=1765979343922 main
collect TaskInfo(id=4, priority=2658) at time=1765979343922 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889
最大优先级任务:TaskInfo(id=14, priority=4889)
load start TaskInfo(id=14, priority=4889) @time=1765979343923 main
load end TaskInfo(id=14, priority=4889) @time=1765979344433 main
collect TaskInfo(id=5, priority=3495) at time=1765979344433 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860
最大优先级任务:TaskInfo(id=7, priority=4860)
load start TaskInfo(id=7, priority=4860) @time=1765979344434 main
load end TaskInfo(id=7, priority=4860) @time=1765979344943 main
collect TaskInfo(id=6, priority=1461) at time=1765979344943 main
133 1045 1461 1818 1939 2658 3495 4365 4478
最大优先级任务:TaskInfo(id=2, priority=4478)
load start TaskInfo(id=2, priority=4478) @time=1765979344943 main
load end TaskInfo(id=2, priority=4478) @time=1765979345452 main
collect TaskInfo(id=7, priority=4860) at time=1765979345452 main
133 1045 1461 1818 1939 2658 3495 4365
最大优先级任务:TaskInfo(id=13, priority=4365)
load start TaskInfo(id=13, priority=4365) @time=1765979345452 main
load end TaskInfo(id=13, priority=4365) @time=1765979345960 main
collect TaskInfo(id=8, priority=7226) at time=1765979345960 main
133 1045 1461 1818 1939 2658 3495
最大优先级任务:TaskInfo(id=5, priority=3495)
load start TaskInfo(id=5, priority=3495) @time=1765979345960 main
load end TaskInfo(id=5, priority=3495) @time=1765979346467 main
collect TaskInfo(id=9, priority=1939) at time=1765979346467 main
133 1045 1461 1818 1939 2658
最大优先级任务:TaskInfo(id=4, priority=2658)
load start TaskInfo(id=4, priority=2658) @time=1765979346467 main
load end TaskInfo(id=4, priority=2658) @time=1765979346973 main
collect TaskInfo(id=10, priority=133) at time=1765979346973 main
133 1045 1461 1818 1939
最大优先级任务:TaskInfo(id=9, priority=1939)
load start TaskInfo(id=9, priority=1939) @time=1765979346974 main
load end TaskInfo(id=9, priority=1939) @time=1765979347482 main
collect TaskInfo(id=11, priority=1818) at time=1765979347482 main
133 1045 1461 1818
最大优先级任务:TaskInfo(id=11, priority=1818)
load start TaskInfo(id=11, priority=1818) @time=1765979347483 main
load end TaskInfo(id=11, priority=1818) @time=1765979347986 main
collect TaskInfo(id=12, priority=7695) at time=1765979347986 main
133 1045 1461
最大优先级任务:TaskInfo(id=6, priority=1461)
load start TaskInfo(id=6, priority=1461) @time=1765979347987 main
load end TaskInfo(id=6, priority=1461) @time=1765979348498 main
collect TaskInfo(id=13, priority=4365) at time=1765979348498 main
133 1045
最大优先级任务:TaskInfo(id=1, priority=1045)
load start TaskInfo(id=1, priority=1045) @time=1765979348498 main
load end TaskInfo(id=1, priority=1045) @time=1765979349006 main
collect TaskInfo(id=14, priority=4889) at time=1765979349006 main
133
最大优先级任务:TaskInfo(id=10, priority=133)
load start TaskInfo(id=10, priority=133) @time=1765979349007 main
load end TaskInfo(id=10, priority=133) @time=1765979349513 main

相关:

https://blog.csdn.net/zhangphil/article/details/154843029

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/23 9:46:20

10个理由让你爱上这份Visio形状资源库

10个理由让你爱上这份Visio形状资源库 【免费下载链接】史上最全Visio形状库分享 你是否在使用Microsoft Visio时&#xff0c;发现内置的形状库无法满足你的需求&#xff1f;你是否在寻找一个更全面、更丰富的形状库来提升你的绘图效率&#xff1f;那么&#xff0c;你来对地方了…

作者头像 李华
网站建设 2026/4/23 9:46:28

OBS多平台推流插件终极指南:3分钟实现多平台同步直播

OBS多平台推流插件终极指南&#xff1a;3分钟实现多平台同步直播 【免费下载链接】obs-multi-rtmp OBS複数サイト同時配信プラグイン 项目地址: https://gitcode.com/gh_mirrors/ob/obs-multi-rtmp 想要一次性将直播内容分发到多个平台&#xff1f;obs-multi-rtmp插件就…

作者头像 李华
网站建设 2026/4/23 9:46:50

如何快速解锁百度网盘资源:提取码智能获取终极指南

如何快速解锁百度网盘资源&#xff1a;提取码智能获取终极指南 【免费下载链接】baidupankey 项目地址: https://gitcode.com/gh_mirrors/ba/baidupankey 还在为百度网盘分享链接的提取码而烦恼吗&#xff1f;面对加密分享和隐藏密码&#xff0c;传统的人工查找方式既费…

作者头像 李华
网站建设 2026/4/23 9:50:09

NanoBanana Pro提示词大全,提示词合集这篇足够!

十一月,Google重磅发布了Gemini3.0与NanoBanana Pro,两款ai工具一经上线就在海内外获得了极高的关注度,许多用户今日在不断地进行体验,感叹NanoBanana Pro的“强悍升级”,随后各种关于NanoBanana Pro的有趣玩法分享就在全网展开。因此该文将分享全网目前热门的NanoBanana Pro提…

作者头像 李华
网站建设 2026/4/23 12:56:47

S7-1500在洁净空调控制系统中的实战应用

西门子S7-1500暖通空调制药厂洁净空调PLC程序案例&#xff0c;硬件采用西门子1500CPUET200SP接口IO模块&#xff0c;HMI采用西门子触摸屏。具体为制药厂BMS&#xff08;洁净空调自控系统&#xff09;医药洁净室程序&#xff0c;程序结构采用SCL编程。 有详细注释&#xff0c;很…

作者头像 李华