2019 九月 17 , 星期二
Home / 实用工具 / kafka / PHP kafka 客户端

PHP kafka 客户端

安装 php zookeeper 扩展 http://www.frankway.net/archives/1213

下载 https://github.com/nmred/kafka-php.git

生产者

// 连接服务
$produce = \Kafka\Produce::getInstance('120.27.***.**:****,120.27.***.**:****,120.27.***.**:****', 6000);
// 获取topic test-2 下可用的分区(test-2 下有2个分区)
$partitions = $produce->getAvailablePartitions('test-2');
var_dump($partitions);

$produce->setRequireAck(-1);
// 在分区1中 添加数据
$produce->setMessages('test-2', 0, array(
    'hello world 1',
    'hello world 2',
));
// 在分区2中 添加数据
$produce->setMessages('test-2', 1, array(
    'hello world 3',
    'hello world 4',
));
// 插入
$result = $produce->send();
var_dump($result);

7F2E98AA-1A1A-4898-9C55-0486941FD935

消费者

// 连接服务
$consumer = \Kafka\Consumer::getInstance('120.27.***.**:****,120.27.***.**:****,120.27.***.**:****');
// 设置访问用户组的名称(可以设置不同的用户组访问不同的分区内的消息)
$group = 'group2';
$consumer->setGroup($group);
$consumer->setFromOffset(true);
//$consumer->setTopic('test-2');  //直接访问topic下分区内的所有消息
// 访问topic test-2 下的分区1
$consumer->setPartition('test-2', 1);

$consumer->setMaxBytes(102400);
$result = $consumer->fetch();
foreach ($result as $topicName => $partition) {
    foreach ($partition as $partId => $messageSet) {
        foreach ($messageSet as $message) {
             var_dump((string)$message);
        }
    }
}

8CD1B648-A94C-4FF0-B51F-3B33FF8B92C4

Check Also

(工作小记)phpredis Redis::SERIALIZER_IGBINARY 自增字段

最近项目中的统计出现了问题,统 ...

发表评论

电子邮件地址不会被公开。 必填项已用 * 标注

您可以使用这些 HTML 标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>