news 2026/6/18 17:51:28

Easyoole 使用rdkafka 进行kafka的创建topic创建 删除 以及数据发布 订阅

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Easyoole 使用rdkafka 进行kafka的创建topic创建 删除 以及数据发布 订阅

开发环境 Linux

首先我们需要下载安装librdkafka

https://github.com/confluentinc/librdkafka/tags?after=v2.10.0-RC2

tar -zxvf librdkafka-2.7.0.tar.gz cd librdkafka-2.7.0 ./configure make && make install

如何知道我们安装成功了呢

ldconfig -p | grep rdkafka

如果有如下输出 就说明安装成功了 。

接下来 我们安装 rdkafka 的PHP 扩展

https://pecl.php.net/package/rdkafka这里我们选择6.0.0

tar -zxvf rdkafka-6.0.0.tgz cd rdkafka-6.0.0 /usr/php/bin/phpize ./configure --with-php-config=/usr/php/bin/php-config make && make

其中phpize 和 php-config 请修改你自己的PHP 环境。

编译好后,修改php.ini 将rdkafka.so 添加到配置中,重启php 运行如下命令,看看是否扩展生效。

php --ri rdkafka

如果有输出 说明对应的kafka扩展已经安装好了 下面将进行代码层面的编写。

在EasySwooleEvent.php 文件中 加载配置目录 如下图所示:

接下来我们在Config 目录新增一个文件 如下:

Kakfa.php

<?php /** * kafka相关连接配置信息 */ return [ 'kafka' => [ "host" => [ '192.168.1.1:9092', ], "zookeeper"=>[ "192.168.1.1:2181/kafka" ], 'kafka_bin_path'=>'/usr/kafka/bin', //kafka的运行命令 本机有kafka客户端 填写 无 忽略 'is_kafka_client'=>true, //本机有kafka客户端 true 有 false 没有,这里与创建 删除topic 有关 若没有客户端,对应topic 需要提前新建好 ], ];

新增相关的服务 我们在App/Service 下面做相关服务。

新增KafkaConsumerService.php

<?php namespace App\Service; use EasySwoole\EasySwoole\Config; use RdKafka\Producer; use RdKafka\KafkaConsumer; class KafkaConsumerService { private $consumer; public function __construct(string $topicName) { $config = Config::getInstance()->getConf('kafka');//配置变量 $brokers = implode(',', $config['host']); $conf = new \RdKafka\Conf(); $conf->set('metadata.broker.list', $brokers); $conf->set('group.id', 'group_' . $topicName); $conf->set('enable.auto.commit', 'true'); $this->consumer = new KafkaConsumer($conf); $this->consumer->subscribe([$topicName]); } public function listen(callable $callback) { while (true) { $msg = $this->consumer->consume(1000); if (empty($msg)) continue; if ($msg->err === RD_KAFKA_RESP_ERR_NO_ERROR) { $callback($msg->payload); } } } }

KafkaProducerService.php

<?php namespace App\Service; use EasySwoole\EasySwoole\Config; use RdKafka\Producer; class KafkaProducerService { private static $instance = null; private $producer; private $topics = []; private function __construct() { $config = Config::getInstance()->getConf('kafka');//配置变量 $brokers = implode(',', $config['host']); $conf = new \RdKafka\Conf(); $conf->set('metadata.broker.list', $brokers); $conf->set('queue.buffering.max.ms', 5); $this->producer = new Producer($conf); } public static function getInstance(): self { if (!self::$instance) { self::$instance = new self(); } return self::$instance; } public function send(string $topicName, string $message): bool { if (!isset($this->topics[$topicName])) { $this->topics[$topicName] = $this->producer->newTopic($topicName); } $topic = $this->topics[$topicName]; $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message); $result = $this->producer->flush(10000); return $result === RD_KAFKA_RESP_ERR_NO_ERROR; } }

KafkaService.php

<?php /** * Kafka消息服务 * @author 树下水月 * @date 2025年11月27日13:23:23 */ namespace App\Service; use EasySwoole\EasySwoole\Config; use RdKafka\Producer; use RdKafka\KafkaConsumer; use RdKafka\Admin\TopicSpecification; class KafkaService { /** * 发布数据到kafka * @param string $topic topic信息 * @param array $data 发送数据 数组 * @return bool */ public static function publish(string $topic, array $data): bool { return KafkaProducerService::getInstance()->send( $topic, json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) ); } /** * 订阅kafka数据 * @param string $topic topic信息 * @param callable $callback * @return void */ public static function consume(string $topic, callable $callback) { $consumer = new KafkaConsumerService($topic); $consumer->listen($callback); } /** * 获取kafka 支持啥类型 * @param string $kafkaBinPath * @return string */ private function getKafkaMode(string $kafkaBinPath): string { $help = []; exec("$kafkaBinPath/kafka-topics.sh --help 2>&1", $help); // 新版 Kafka (2.0+) 支持 --bootstrap-server foreach ($help as $line) { if (strpos($line, '--bootstrap-server') !== false) { return 'bootstrap'; } } return 'zookeeper'; } /** * 获取 list 命令 */ private function buildListCommand(string $kafkaBinPath, string $addr, string $zookeeps, string $mode) { if ($mode === 'bootstrap') { return "$kafkaBinPath/kafka-topics.sh --bootstrap-server $addr --list"; } else { return "$kafkaBinPath/kafka-topics.sh --zookeeper $zookeeps --list"; } } /** * 获取 create 命令 */ private function buildCreateCommand(string $kafkaBinPath, string $addr, string $zookeeps, string $topic, int $partitions, int $replica, string $mode) { if ($mode === 'bootstrap') { return "$kafkaBinPath/kafka-topics.sh --bootstrap-server $addr --create --topic $topic --partitions $partitions --replication-factor $replica"; } else { return "$kafkaBinPath/kafka-topics.sh --zookeeper $zookeeps --create --topic $topic --partitions $partitions --replication-factor $replica"; } } /** * 获取 delete 命令 */ private function buildDeleteCommand(string $kafkaBinPath, string $addr, string $zookeepers, string $topic, string $mode) { if ($mode === 'bootstrap') { return "$kafkaBinPath/kafka-topics.sh --bootstrap-server $addr --delete --topic $topic"; } else { return "$kafkaBinPath/kafka-topics.sh --zookeeper $zookeepers --delete --topic $topic"; } } /** * 在 PHP 中通过 exec 创建 Kafka topic * @param string $topicName Topic 名称 * @param int $partitions 分区数 * @param int $replication 副本数 * @param string $kafkaBinPath Kafka bin 目录(包含 kafka-topics.sh) * @return array 返回结果 ['success'=>bool, 'message'=>string] */ public function createKafkaTopic($topicName, $partitions = 1, $replication = 1, $kafka_bootstrap, $zookeepers, $is_kafka_client = false, $kafkaBinPath = '/yisa_oe/kafka/bin') { if ($is_kafka_client) { $addr = $kafka_bootstrap; $mode = $this->getKafkaMode($kafkaBinPath); //获取模式 zookeeper bootstrap if ($this->isKafkaTopicExist($topicName, [$addr], [$zookeepers], $kafkaBinPath)) { var_dump("Topic {$topicName} 已存在,跳过创建"); return true; } $cmd = $this->buildCreateCommand($kafkaBinPath, $addr, $zookeepers, $topicName, $partitions, $replication, $mode); //执行创建 topic exec($cmd . " 2>&1", $output, $returnVar); return $returnVar === 0; } else { var_dump("当前系统内没有kafka客户端,跳过topic创建"); return true; //没有客户端 不会创建topic 直接跳过 } } /** * 检查 Topic 是否存在(兼容模式) * @param $topicName 需要检查的topic * @param array $brokers * @param array $zookeeper zookeeper * @param $kafkaBin * @return bool * @throws \Exception */ public function isKafkaTopicExist($topicName, array $brokers, array $zookeeper, $kafkaBin = '/yisa_oe/kafka/bin') { $addr = implode(',', $brokers); $zookeepers = implode(',', $zookeeper); $mode = $this->getKafkaMode($kafkaBin); //获取模式 $cmd = $this->buildListCommand($kafkaBin, $addr, $zookeepers, $mode); //获取topic 是否存在 exec($cmd . " 2>&1", $output, $returnVar); if ($returnVar !== 0) { throw new \Exception("检查 topic 失败:" . implode("\n", $output)); } return in_array($topicName, $output); } /** * 删除 Topic * @param $topicName 需要删除的topic * @param array $brokers * @param array $zookeeps zookeep 信息 * @param $is_kafka_client 是否有kafka客户端 默认false 无 * @param $kafkaBin kafka 对应的bin执行目录 * @return bool */ public function deleteKafkaTopic($topicName, array $brokers, array $zookeeper, $is_kafka_client = false, $kafkaBin = '/yisa_oe/kafka/bin') { if ($is_kafka_client) { $addr = implode(',', $brokers); $zookeeper_str = implode(',', $zookeeper); $mode = $this->getKafkaMode($kafkaBin); $cmd = $this->buildDeleteCommand($kafkaBin, $addr, $zookeeper_str, $topicName, $mode); exec($cmd . " 2>&1", $output, $returnVar); return $returnVar === 0; } else { return true; } } }

我们新增一个路由 这里直接忽略 我们以Test.php 这个控制器为例吧

<?php /** * 测评回调控制器 * @author liupeng * @email liupenga@yisa.com * @date 2025年11月18日14:22:20 */ namespace App\HttpController\Api; use EasySwoole\Http\AbstractInterface\Controller; use App\Service\KafkaService; use EasySwoole\EasySwoole\Config; use EasySwoole\ORM\Exception\Exception; use EasySwoole\Validate\Validate; use App\Model\CommonModel; use App\Model\CommonOrmModel; use EasySwoole\Validate\Functions\Length; class Evaluation extends Controller { public function onRequest(?string $action): bool { return parent::onRequest($action); // TODO: Change the autogenerated stub } public function __construct() { parent::__construct(); $this->config = Config::getInstance()->getConf('common');//配置变量 } public function tet(){ $config = Config::getInstance()->getConf('kafka');//配置变量 $kafka_bootstrap = implode(',', $config['host']); // 逗号分隔 $zookeeps = implode(',', $config['zookeeper']); //zookeep地址 $is_kafka_client = $config['is_kafka_client']; //是否存在 $kafka_bin_path = $config['kafka_bin_path']; $topicName = 'test'; $partitions = 1; $replication = 1; $kafka_service = new KafkaService(); $result = $kafka_service->createKafkaTopic($topicName, $partitions, $replication, $kafka_bootstrap, $zookeeps, $is_kafka_client, $kafka_bin_path); //创建kafka 的topic $brokers = $config['host']; //数组 $zookeeps_arr = $config['zookeeper']; //数组 var_dump(KafkaService::publish($topicName, ['ddd' => 33, 'time' => date('Y-m-d H:i:s')])); //写入kafka数据 }

这个tet方法 是创建了一个topic 名字为test的 如果存在 就跳过 如果不存在,创建,在然后就是KafkaService::publish 推送数据到对应的topic 中。

删除Topic

public function tet1(){ $config = Config::getInstance()->getConf('kafka');//配置变量 $kafka_bootstrap = implode(',', $config['host']); // 逗号分隔 $zookeeps = implode(',', $config['zookeeper']); //zookeep地址 $is_kafka_client = $config['is_kafka_client']; //是否存在 $kafka_bin_path = $config['kafka_bin_path']; $topicName = 'test'; $partitions = 1; $replication = 1; $kafka_service = new KafkaService(); $brokers = $config['host']; //数组 $zookeeps_arr = $config['zookeeper']; //数组 $result = $kafka_service->deleteKafkaTopic('Liupeng', $brokers, $zookeeps_arr, $is_kafka_client, $kafka_bin_path); //删除topic }

订阅某个kafka数据

public function dd() { $config = Config::getInstance()->getConf('kafka');//配置变量 $kafka_bootstrap = implode(',', $config['host']); // 逗号分隔 $zookeeps = implode(',', $config['zookeeper']); //zookeep地址 $is_kafka_client = $config['is_kafka_client']; //是否存在 $kafka_bin_path = $config['kafka_bin_path']; $topicName = 'kkk'; $kafka_service = new KafkaService(); $brokers = $config['host']; //数组 $zookeeps_arr = $config['zookeeper']; //数组 //消费kafka数据 一致占用 KafkaService::consume($topicName,function ($msg){ echo "收到". $msg . PHP_EOL; }); }

KafkaService::consume 订阅数据

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/18 9:53:30

公司局域网访问外网的原理

公司局域网访问外网的原理 在现代企业环境中&#xff0c;如何让内部员工既能访问公司内部资源&#xff0c;又能安全地访问互联网是一个重要的网络架构问题。本文将以通俗易懂的方式介绍公司局域网访问外网的基本原理和常见配置方式。 基本网络架构 大多数公司的网络架构通常…

作者头像 李华
网站建设 2026/6/18 6:17:25

【最新源码】医疗设备维护平台023

摘 要 随着医疗行业的不断发展&#xff0c;医疗设备的高效维护对于保障医疗服务质量和安全至关重要。一款功能全面的医疗设备维护平台应运而生。平台基于Java语言、Spring Boot框架和MySQL数据库开发&#xff0c;涵盖了医护人员、工程师、报修类型、设备类型、医疗设备、任务…

作者头像 李华
网站建设 2026/6/18 14:34:33

Kotaemon GitOps 实践:ArgoCD 自动化同步配置

Kotaemon GitOps 实践&#xff1a;ArgoCD 自动化同步配置 在当今企业级 AI 应用的部署场景中&#xff0c;一个常见的困境是&#xff1a;开发团队刚刚上线了一个优化后的 RAG 模型&#xff0c;问答准确率提升了 15%&#xff0c;但几小时后用户反馈系统回答变得混乱。排查发现&am…

作者头像 李华
网站建设 2026/6/16 4:26:48

2024年VR安全体验馆领域权威推荐:经实测的最新榜单

2024年VR安全体验馆领域权威推荐指南一、开篇引言在2024年&#xff0c;VR安全体验馆对于提升公众安全意识、进行高效安全培训等方面发挥着愈发重要的作用。然而&#xff0c;当前市场上VR安全体验馆产品质量参差不齐&#xff0c;用户在选择时面临诸多困难。根据“某行业协会”发…

作者头像 李华
网站建设 2026/6/17 23:48:58

【2025网络安全含金量最高的4本证书】:NISP、CISP、CISP-PTE、CISSP(必考证书)零基础入门到精通,看完这一篇就够了!

前言 学习网络安全&#xff0c;有4个必考证书&#xff1a;NISP、CISP、CISP-PTE、CISSP。 这4本证书分别代表了国内和国际上对信息安全专业人员不同程度的认证标准&#xff0c;对于想要提升技术和就业晋升转行人员来说非常重要&#xff01; 一、NISP&#xff08;中国信息安全测…

作者头像 李华