博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka-php
阅读量:4964 次
发布时间:2019-06-12

本文共 2310 字,大约阅读时间需要 7 分钟。

kafka-php

kafka-php的github地址  https://github.com/jacky5059/kafka-php

生产者produce示例代码

send($messages, $topic); printf("\nSuccessfully sent %d messages (%d bytes)\n\n", count($messages), $bytes);}

 

简单消费者simple consumer示例代码

fetch($fetchRequest); foreach ($messages as $msg) { ++$nMessages; echo "\nconsumed[$offset][$partialOffset][msg #{
$nMessages}]: " . $msg->payload(); $partialOffset = $messages->validBytes(); } //advance the offset after consuming each message $offset += $messages->validBytes(); //echo "\n---[Advancing offset to $offset]------(".date('H:i:s').")"; unset($fetchRequest); //sleep(2); } catch (Exception $e) { // probably consumed all items in the queue. echo "\nERROR: " . get_class($e) . ': ' . $e->getMessage()."\n".$e->getTraceAsString()."\n"; sleep(2); }}

 

基于zookeeper的消费者zkconsumer示例代码

getReadBytes() >= $maxBatchSize) { break; } }} catch (Kafka_Exception_OffsetOutOfRange $exception) { // if we haven't received any messages, resync the offsets for the next time, then bomb out if ($zkconsumer->getReadBytes() == 0) { $zkconsumer->resyncOffsets(); die($exception->getMessage()); } // if we did receive some messages before the exception, carry on.} catch (Kafka_Exception_Socket_Connection $exception) { // deal with it below} catch (Kafka_Exception $exception) { // deal with it below}if (null !== $exception) { // if we haven't received any messages, bomb out if ($zkconsumer->getReadBytes() == 0) { die($exception->getMessage()); } // otherwise log the error, commit the offsets for the messages read so far and return the data}// process the data in batches, wait for ACK$success = doSomethingWithTheMessages($messages);// Once the data is processed successfully, commit the byte offsets.if ($success) { $zkconsumer->commitOffsets();}// get an approximate figure on the size of the queuetry { echo "\nRemaining bytes in queue: " . $consumer->getRemainingSize();} catch (Kafka_Exception_Socket_Connection $exception) { die($exception->getMessage());} catch (Kafka_Exception $exception) { die($exception->getMessage());}

 

转载于:https://www.cnblogs.com/huqiang/p/5147476.html

你可能感兴趣的文章
mysql addtime() 函数
查看>>
mysql 根据日期时间查询数据
查看>>
mysql 创建时间字段
查看>>
mysql 生成随机数rand()
查看>>
mysql e的n次幂exp()
查看>>
mysql sin() 函数
查看>>
mysql upper() 函数
查看>>
mysql 子查询
查看>>
mysql 自联结
查看>>
mysql union 组合查询
查看>>
socket tcp
查看>>
spiral-matrix-ii &i 生成顺时针序列
查看>>
python set集合方法总结
查看>>
python考点
查看>>
dstat 监控时,无颜色显示
查看>>
CSS3阴影 box-shadow的使用和技巧总结
查看>>
DataMining--Python基础入门
查看>>
单片机复位电路
查看>>
php json_decode失败,返回null
查看>>
获取单选按钮选中的值
查看>>