kafka-php
kafka-php的github地址 https://github.com/jacky5059/kafka-php
生产者produce示例代码
send($messages, $topic); printf("\nSuccessfully sent %d messages (%d bytes)\n\n", count($messages), $bytes);}
简单消费者simple consumer示例代码
fetch($fetchRequest); foreach ($messages as $msg) { ++$nMessages; echo "\nconsumed[$offset][$partialOffset][msg #{ $nMessages}]: " . $msg->payload(); $partialOffset = $messages->validBytes(); } //advance the offset after consuming each message $offset += $messages->validBytes(); //echo "\n---[Advancing offset to $offset]------(".date('H:i:s').")"; unset($fetchRequest); //sleep(2); } catch (Exception $e) { // probably consumed all items in the queue. echo "\nERROR: " . get_class($e) . ': ' . $e->getMessage()."\n".$e->getTraceAsString()."\n"; sleep(2); }}
基于zookeeper的消费者zkconsumer示例代码
getReadBytes() >= $maxBatchSize) { break; } }} catch (Kafka_Exception_OffsetOutOfRange $exception) { // if we haven't received any messages, resync the offsets for the next time, then bomb out if ($zkconsumer->getReadBytes() == 0) { $zkconsumer->resyncOffsets(); die($exception->getMessage()); } // if we did receive some messages before the exception, carry on.} catch (Kafka_Exception_Socket_Connection $exception) { // deal with it below} catch (Kafka_Exception $exception) { // deal with it below}if (null !== $exception) { // if we haven't received any messages, bomb out if ($zkconsumer->getReadBytes() == 0) { die($exception->getMessage()); } // otherwise log the error, commit the offsets for the messages read so far and return the data}// process the data in batches, wait for ACK$success = doSomethingWithTheMessages($messages);// Once the data is processed successfully, commit the byte offsets.if ($success) { $zkconsumer->commitOffsets();}// get an approximate figure on the size of the queuetry { echo "\nRemaining bytes in queue: " . $consumer->getRemainingSize();} catch (Kafka_Exception_Socket_Connection $exception) { die($exception->getMessage());} catch (Kafka_Exception $exception) { die($exception->getMessage());}