项目结构:
模拟5 个核心业务环节,组成流水线:
Push1:原料采购部(推送原料订单)
Pull-Push:加工车间(拉取原料 → 推送加工完成的珠宝)
Pull-Push:质检中心(拉取成品 → 推送质检分级结果)
Pull-Push:门店销售部(拉取质检成品 → 推送销售数据)
Pull:售后维保部(拉取销售数据 → 提供维保服务)
# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:38 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : settings.py """全局配置:端口、超时、珠宝行业固定常量、消息分隔符""" # ZMQ 通信端口定义 PORT_RAW_MATERIAL = 5555 PORT_PROCESS = 5556 PORT_QUALITY = 5557 PORT_SALE = 5558 # 通信超时(ms) ZMQ_SOCKET_TIMEOUT = 3000 # 珠宝行业基础枚举常量 JEWELRY_CATEGORY = [ "钻石戒指", "黄金项链", "翡翠手镯", "铂金耳钉", "彩宝吊坠" ] QUALITY_GRADE = [ "S级(收藏)", "A级(精品)", "B级(常规)", "C级(特价)" ] # 消息分隔符,统一解析格式 MSG_SEP = "|" KV_SEP = ":" # 本地连接地址 LOCAL_TCP_ADDR = "tcp://localhost" BIND_ADDR = "tcp://*" # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:39 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : logger.py import logging import sys from PushPullPattern.config.settings import ZMQ_SOCKET_TIMEOUT def get_module_logger(module_name: str) -> logging.Logger: """ 获取模块独立日志器,区分业务模块输出 :param module_name: :return: """ logger = logging.getLogger(module_name) logger.setLevel(logging.INFO) if not logger.handlers: formatter = logging.Formatter( fmt="%(asctime)s [%(name)s] %(levelname)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setFormatter(formatter) logger.addHandler(stream_handler) return logger # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:41 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : message.py """统一消息封装:所有业务消息标准化序列化、解析""" from typing import Dict from PushPullPattern.config.settings import MSG_SEP, KV_SEP class BaseMessage: """ 消息基类,统一打包/解包逻辑 """ @staticmethod def pack(data: Dict[str, str]) -> str: """ 字典转消息字符串 :param data: :return: """ parts = [] for k, v in data.items(): parts.append(f"{k}{KV_SEP}{v}") return MSG_SEP.join(parts) @staticmethod def unpack(raw_msg: str) -> Dict[str, str]: """ 消息字符串转回字典 :param raw_msg: :return: """ result = {} items = raw_msg.split(MSG_SEP) for item in items: if KV_SEP in item: k, v = item.split(KV_SEP, maxsplit=1) result[k] = v return result # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 pip install zmq # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:43 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : socket_factory.py import zmq from PushPullPattern.config.settings import ZMQ_SOCKET_TIMEOUT from PushPullPattern.utils.logger import get_module_logger logger = get_module_logger("SocketFactory") class ZmqSocketFactory: """ 统一创建Push/Pull套接字,资源统一管理,职责单一 """ _context = zmq.Context() @classmethod def create_push_bind(cls, port: int) -> zmq.Socket: """ 创建Push生产者,绑定端口对外提供服务 :param port: :return: """ sock = cls._context.socket(zmq.PUSH) sock.setsockopt(zmq.RCVTIMEO, ZMQ_SOCKET_TIMEOUT) sock.setsockopt(zmq.SNDTIMEO, ZMQ_SOCKET_TIMEOUT) bind_url = f"tcp://*:{port}" sock.bind(bind_url) logger.info(f"Push Socket bind success: {bind_url}") return sock @classmethod def create_pull_connect(cls, port: int) -> zmq.Socket: """ 创建Pull消费者,连接上游Push端口 :param port: :return: """ sock = cls._context.socket(zmq.PULL) sock.setsockopt(zmq.RCVTIMEO, ZMQ_SOCKET_TIMEOUT) sock.setsockopt(zmq.SNDTIMEO, ZMQ_SOCKET_TIMEOUT) connect_url = f"tcp://localhost:{port}" sock.connect(connect_url) logger.info(f"Pull Socket connect success: {connect_url}") return sock @classmethod def close_context(cls): """ 程序退出统一释放ZMQ资源 :return: """ cls._context.term() logger.info("ZMQ Context resource released")# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:45 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : raw_material_service.py import time import random from PushPullPattern.core.socket_factory import ZmqSocketFactory from PushPullPattern.model.message import BaseMessage from PushPullPattern.config.settings import PORT_RAW_MATERIAL, JEWELRY_CATEGORY from PushPullPattern.utils.logger import get_module_logger logger = get_module_logger("RawMaterialService") class RawMaterialService: """ 原料采购(纯 Push 生产者) 业务职责:珠宝原料采购,仅推送原料订单消息 """ def __init__(self): self.push_sock = ZmqSocketFactory.create_push_bind(PORT_RAW_MATERIAL) def run_produce(self, total_count: int = 10): """ :param total_count: :return: """ logger.info("原料采购服务启动,开始生成原料订单") for seq in range(1, total_count + 1): cat = random.choice(JEWELRY_CATEGORY) order_id = f"RAW_{seq:03d}" msg_data = { "type": "原料订单", "order_id": order_id, "category": cat, "status": "已采购待加工" } msg_str = BaseMessage.pack(msg_data) self.push_sock.send_string(msg_str) logger.info(f"推送原料消息: {msg_str}") time.sleep(2) logger.info("全部原料订单推送完成") # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:47 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : process_service.py import time import random import zmq from PushPullPattern.core.socket_factory import ZmqSocketFactory from PushPullPattern.model.message import BaseMessage from PushPullPattern.config.settings import PORT_RAW_MATERIAL, PORT_PROCESS from PushPullPattern.utils.logger import get_module_logger logger = get_module_logger("ProcessService") class ProcessService: """ 加工车间(Pull+Push 中转) 业务职责:拉取原料,加工生成成品,向下游推送 """ def __init__(self): self.pull_sock = ZmqSocketFactory.create_pull_connect(PORT_RAW_MATERIAL) self.push_sock = ZmqSocketFactory.create_push_bind(PORT_PROCESS) def run_pipeline(self): logger.info("珠宝加工车间服务启动,等待上游原料") while True: try: # 非阻塞拉取,避免卡死 raw_msg = self.pull_sock.recv_string(flags=zmq.NOBLOCK) data = BaseMessage.unpack(raw_msg) logger.info(f"接收原料消息: {raw_msg}") # 模拟加工耗时 time.sleep(random.uniform(1, 3)) raw_order = data["order_id"] finish_order = raw_order.replace("RAW", "FIN") msg_data = { "type": "成品珠宝", "order_id": finish_order, "category": data["category"], "status": "已加工待质检" } send_msg = BaseMessage.pack(msg_data) self.push_sock.send_string(send_msg) logger.info(f"加工完成推送: {send_msg}\n") except zmq.Again: # 无消息时短暂休眠,降低CPU占用 time.sleep(0.1) continue # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:49 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : quality_service.py import random import time import zmq from PushPullPattern.core.socket_factory import ZmqSocketFactory from PushPullPattern.model.message import BaseMessage from PushPullPattern.config.settings import PORT_PROCESS, PORT_QUALITY, QUALITY_GRADE from PushPullPattern.utils.logger import get_module_logger logger = get_module_logger("QualityService") class QualityService: """ 质检中心(Pull+Push 中转) 业务职责:拉取加工成品,质检分级,推送可销售货品 """ def __init__(self): self.pull_sock = ZmqSocketFactory.create_pull_connect(PORT_PROCESS) self.push_sock = ZmqSocketFactory.create_push_bind(PORT_QUALITY) def run_pipeline(self): logger.info("质检中心服务启动,等待加工成品") while True: try: raw_msg = self.pull_sock.recv_string(flags=zmq.NOBLOCK) data = BaseMessage.unpack(raw_msg) logger.info(f"接收成品消息: {raw_msg}") time.sleep(random.uniform(0.5, 2)) grade = random.choice(QUALITY_GRADE) msg_data = { "type": "质检成品", "order_id": data["order_id"], "category": data["category"], "grade": grade, "status": "可销售" } send_msg = BaseMessage.pack(msg_data) self.push_sock.send_string(send_msg) logger.info(f"质检完成推送: {send_msg}\n") except zmq.Again: time.sleep(0.1) continue # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:52 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : sale_service.py import time import zmq import random from PushPullPattern.core.socket_factory import ZmqSocketFactory from PushPullPattern.model.message import BaseMessage from PushPullPattern.config.settings import PORT_QUALITY, PORT_SALE from PushPullPattern.utils.logger import get_module_logger logger = get_module_logger("SaleService") class SaleService: """ 门店销售(Pull+Push 中转) 业务职责:拉取质检货品,模拟销售,推送销售单据给售后 """ def __init__(self): self.pull_sock = ZmqSocketFactory.create_pull_connect(PORT_QUALITY) self.push_sock = ZmqSocketFactory.create_push_bind(PORT_SALE) def run_pipeline(self): logger.info("门店销售服务启动,等待质检货品") while True: try: raw_msg = self.pull_sock.recv_string(flags=zmq.NOBLOCK) data = BaseMessage.unpack(raw_msg) logger.info(f"接收质检货品: {raw_msg}") time.sleep(random.uniform(1, 2.5)) sale_order = data["order_id"].replace("FIN", "SALE") price = random.randint(1000, 50000) msg_data = { "type": "销售完成", "order_id": sale_order, "category": data["category"], "price": f"{price}元", "status": "已售出" } send_msg = BaseMessage.pack(msg_data) self.push_sock.send_string(send_msg) logger.info(f"销售单据推送: {send_msg}\n") except zmq.Again: time.sleep(0.1) continue # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:53 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : after_sale_service.py import time import zmq import random from PushPullPattern.core.socket_factory import ZmqSocketFactory from PushPullPattern.model.message import BaseMessage from PushPullPattern.config.settings import PORT_SALE from PushPullPattern.utils.logger import get_module_logger logger = get_module_logger("AfterSaleService") class AfterSaleService: """ 售后维保(纯 Pull 末端消费者) 业务职责:接收销售单据,开通终身维保档案,流水线终点 """ def __init__(self): self.pull_sock = ZmqSocketFactory.create_pull_connect(PORT_SALE) def run_consumer(self): logger.info("售后维保服务启动,等待销售单据") while True: try: raw_msg = self.pull_sock.recv_string(flags=zmq.NOBLOCK) data = BaseMessage.unpack(raw_msg) logger.info(f"接收销售单据: {raw_msg}") logger.info( f"维保档案创建成功 | 品类:{data['category']} " f"订单:{data['order_id']} | 终身维保已生效\n" ) except zmq.Again: time.sleep(0.1) continue调用:
# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:55 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : PushPullBll.py import time import threading import atexit from PushPullPattern.core.socket_factory import ZmqSocketFactory from PushPullPattern.service import ( RawMaterialService, ProcessService, QualityService, SaleService, AfterSaleService ) from PushPullPattern.utils.logger import get_module_logger class PushPullBll(object): """ """ logger = get_module_logger("PushPullBll") # 注册退出钩子,释放ZMQ资源 @atexit.register def release_zmq(self): ZmqSocketFactory.close_context() self.logger.info("✅ ZMQ 资源已释放") def start_all_pipeline(self): """统一启动全链路多线程""" # 实例化服务 raw_service = RawMaterialService() process_service = ProcessService() quality_service = QualityService() sale_service = SaleService() after_service = AfterSaleService() # 创建线程 t_raw = threading.Thread(target=raw_service.run_produce, args=(10,), daemon=True) t_process = threading.Thread(target=process_service.run_pipeline, daemon=True) t_quality = threading.Thread(target=quality_service.run_pipeline, daemon=True) t_sale = threading.Thread(target=sale_service.run_pipeline, daemon=True) t_after = threading.Thread(target=after_service.run_consumer, daemon=True) # 启动 t_raw.start() t_process.start() t_quality.start() t_sale.start() t_after.start() self.logger.info("🚀 珠宝全链路 Push-Pull 流水线启动完成") self.logger.info("流水线运行中,按 Ctrl + C 停止\n") # 主线程保持运行 try: t_raw.join() while True: time.sleep(1) except KeyboardInterrupt: self.logger.info("🛑 程序正在退出...") def demo(self): """ :return: """ self.start_all_pipeline()输出: