php安装kafka扩展及php操作kafka示例

php操作kafka需要安装rdkafka扩展,而rdkafka又依赖librdkafka库,因此在安装rdkafka之前,需要先安装librdkafka,之后就可以与kafka服务进行交互了。

一、源码编译安装

1.1、安装 librdkafka 库

git clone https://github.com/edenhill/librdkafka.git

cd librdkafka/

./configure

make && make install


1.2、安装 php-kafka(rdkafka) 扩展

git clone https://github.com/arnaud-lb/php-rdkafka.git

cd php-rdkafka/

/usr/local/php/bin/phpize

./configure --with-php-config=/usr/local/php/bin/php-config

make && make install


# 在php.ini 文件中加入rdkafka扩展
extension=rdkafka.so

二、使用pecl一键安装

2.1、使用pecl一键安装rdkafka扩展

# 安装好了librdkafka库之后,就可以使用pecl的方式一键安装rdkafka扩展了

pecl install rdkafka  # 安装rdkafka扩展

三、php与kafka的交互操作代码示例

demo来源于:https://github.com/arnaud-lb/php-rdkafka#examples


正常的生产逻辑如下:

3.1、配置生产者客户端参数及创建相应的生产者实例;

/**
 * Create a producer
 */
 
$conf = new RdKafka\Conf();
$conf->set('log_level', LOG_DEBUG);
//$conf->set('debug', 'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("127.0.0.1");


3.2、构建主题;

/**
 * Create a topic instance from the producer
 */
 
$topic = $rk->newTopic("test");


3.3、发送消息;

/**
 * Producing messages
 * The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.
 * 第一个参数是分区,RD_KAFKA_PARTITION_UA 表示未分配,并且由 librdkafka 选择分区。
 * 
 * The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue.
 * 第二个参数是消息标志,为 0 或 RD_KAFKA_MSG_F_BLOCK,当队列满了时阻止生产消息。
 *
 * The message payload can be anything.
 * 消息可以是任何内容。
 */
 
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");


3.4、关闭生产者实例;

/**
 * Proper shutdown
 * This should be done prior to destroying a producer instance
 *  to make sure all queued and in-flight produce requests are completed before terminating.
 * 关闭生产者实例前需确保所有在队列中和正在生产的生产请求都已完成。
 *
 * before terminating. Use a reasonable value for $timeout_ms.
 * 在终止之前。为$timeout_ms使用合理的值。
 *
 * Not calling flush can lead to message loss!
 * 不调用flush会导致消息丢失!
 */
 
$timeout_ms = 60000; // 1 minute
$rk->flush($timeout_ms);


3.5、检验消息是否发送成功


终端开启一个消费者:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test


终端开启另一个窗口执行生产者:

233 (2).png


可看到消费者终端接收到消息:

233.png


3.6、完整代码


完整代码如下:

<?php

/**
 * Create a producer
 */
$conf = new RdKafka\Conf();
$conf->set('log_level', LOG_DEBUG);
//$conf->set('debug', 'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("127.0.0.1");

/**
 * Create a topic instance from the producer
 */
 
$topic = $rk->newTopic("test");

/**
 * Producing messages
 * The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.
 * 第一个参数是分区,RD_KAFKA_PARTITION_UA 表示未分配,并且由 librdkafka 选择分区。
 * The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue.
 * 第二个参数是消息标志,为 0 或 RD_KAFKA_MSG_F_BLOCK,当队列满了时阻止生产消息。
 * The message payload can be anything.
 * 消息可以是任何内容。
 */
 
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");

/**
 * Proper shutdown
 * This should be done prior to destroying a producer instance
 *   to make sure all queued and in-flight produce requests are completed before terminating.
 * 关闭生产者实例前需确保所有在队列中和正在生产的生产请求都已完成。
 *
 * before terminating. Use a reasonable value for $timeout_ms.
 * 在终止之前。为$timeout_ms使用合理的值。
 *
 * Not calling flush can lead to message loss!
 * 不调用flush会导致消息丢失!
 */
 
$timeout_ms = 60000; // 1 minute
$rk->flush($timeout_ms);

echo 'finished';


rdkafka扩展相关文档使用说明:https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/book.rdkafka.html


php Kafka客户端(php-rdkafka)的github地址:https://github.com/arnaud-lb/php-rdkafka  (含有安装说明及使用示例及手册等)



声明:禁止任何非法用途使用,凡因违规使用而引起的任何法律纠纷,本站概不负责。

小周博客
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

精彩评论

全部回复 0人评论 7,777人参与

loading