博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
php rabbitmq操作类及生产者和消费者实例代码 转
阅读量:6266 次
发布时间:2019-06-22

本文共 5384 字,大约阅读时间需要 17 分钟。

注意事项:

1、accept.php消费者代码需要在命令行执行

2、'username'=>'asdf','password'=>'123456' 改成自己的帐号和密码

 

RabbitMQCommand.php操作类代码

$host,'port'=>5672,'username'=>$username,'password'=>$password,'vhost'=>'/') */ public function __construct($configs = array(), $exchange_name = '', $queue_name = '', $route_key = '') { $this->setConfigs($configs); $this->exchange_name = $exchange_name; $this->queue_name = $queue_name; $this->route_key = $route_key; } private function setConfigs($configs) { if (!is_array($configs)) { throw new Exception('configs is not array'); } if (!($configs['host'] && $configs['port'] && $configs['username'] && $configs['password'])) { throw new Exception('configs is empty'); } if (empty($configs['vhost'])) { $configs['vhost'] = '/'; } $configs['login'] = $configs['username']; unset($configs['username']); $this->configs = $configs; } /* * 设置是否持久化,默认为True */ public function setDurable($durable) { $this->durable = $durable; } /* * 设置是否自动删除 */ public function setAutoDelete($autodelete) { $this->autodelete = $autodelete; } /* * 设置是否镜像 */ public function setMirror($mirror) { $this->mirror = $mirror; } /* * 打开amqp连接 */ private function open() { if (!$this->_conn) { try { $this->_conn = new AMQPConnection($this->configs); $this->_conn->connect(); $this->initConnection(); } catch (AMQPConnectionException $ex) { throw new Exception('cannot connection rabbitmq',500); } } } /* * rabbitmq连接不变 * 重置交换机,队列,路由等配置 */ public function reset($exchange_name, $queue_name, $route_key) { $this->exchange_name = $exchange_name; $this->queue_name = $queue_name; $this->route_key = $route_key; $this->initConnection(); } /* * 初始化rabbit连接的相关配置 */ private function initConnection() { if (empty($this->exchange_name) || empty($this->queue_name) || empty($this->route_key)) { throw new Exception('rabbitmq exchange_name or queue_name or route_key is empty',500); } $this->_channel = new AMQPChannel($this->_conn); $this->_exchange = new AMQPExchange($this->_channel); $this->_exchange->setName($this->exchange_name); $this->_exchange->setType(AMQP_EX_TYPE_DIRECT); if ($this->durable) $this->_exchange->setFlags(AMQP_DURABLE); if ($this->autodelete) $this->_exchange->setFlags(AMQP_AUTODELETE); $this->_exchange->declare(); $this->_queue = new AMQPQueue($this->_channel); $this->_queue->setName($this->queue_name); if ($this->durable) $this->_queue->setFlags(AMQP_DURABLE); if ($this->autodelete) $this->_queue->setFlags(AMQP_AUTODELETE); if ($this->mirror) $this->_queue->setArgument('x-ha-policy', 'all'); $this->_queue->declare(); $this->_queue->bind($this->exchange_name, $this->route_key); } public function close() { if ($this->_conn) { $this->_conn->disconnect(); } } public function __sleep() { $this->close(); return array_keys(get_object_vars($this)); } public function __destruct() { $this->close(); } /* * 生产者发送消息 */ public function send($msg) { $this->open(); if(is_array($msg)){ $msg = json_encode($msg); }else{ $msg = trim(strval($msg)); } return $this->_exchange->publish($msg, $this->route_key); } /* * 消费者 * $fun_name = array($classobj,$function) or function name string * $autoack 是否自动应答 * * function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg."\n"; //处理消息 $queue->ack($envelope->getDeliveryTag());//手动应答 } */ public function run($fun_name, $autoack = True){ $this->open(); if (!$fun_name || !$this->_queue) return False; while(True){ if ($autoack) $this->_queue->consume($fun_name, AMQP_AUTOACK); else $this->_queue->consume($fun_name); } }}

  

send.php生产者代码

<?php

set_time_limit(0);
include_once('RabbitMQCommand.php');

$configs = array('host'=>'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/');

$exchange_name = 'class-e-1';
$queue_name = 'class-q-1';
$route_key = 'class-r-1';
$ra = new RabbitMQCommand($configs,$exchange_name,$queue_name,$route_key);
for($i=0;$i<=100;$i++){
$ra->send(date('Y-m-d H:i:s',time()));
}
exit();

  消费者代码

'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/');$exchange_name = 'class-e-1';$queue_name = 'class-q-1';$route_key = 'class-r-1';$ra = new RabbitMQCommand($configs,$exchange_name,$queue_name,$route_key);class A{ function processMessage($envelope, $queue) { $msg = $envelope->getBody(); $envelopeID = $envelope->getDeliveryTag(); $pid = posix_getpid(); file_put_contents("log{$pid}.log", $msg.'|'.$envelopeID.''."\r\n",FILE_APPEND); $queue->ack($envelopeID); }}$a = new A();$s = $ra->run(array($a,'processMessage'),false);

  

转载地址:http://tcdpa.baihongyu.com/

你可能感兴趣的文章
pgpool-II的conn_info 指针的结构
查看>>
JAVA的CALLBACK
查看>>
转:Tomcat 启动报错The APR based Apache Tomcat Native l...
查看>>
MyFirefox v2.6.2 - 打造自己的 Firefox 便携版
查看>>
各大类库的类工厂
查看>>
asp.net关于上传文件修改文件名的方法
查看>>
敏捷结果30天之第九天:使用必须、应该、可以来确定每天事情的优先级
查看>>
NFS在redhat中的一些简易应用
查看>>
mysqlbinlog查看编码问题
查看>>
进程通信(VC_Win32)
查看>>
MVP福利--利用Azure虚拟机玩Windows Server 2012
查看>>
Mac中将delete键定义为删除键
查看>>
python 函数关键参数
查看>>
ubuntu一键安装lamp
查看>>
漫谈 Clustering (1): k-means
查看>>
SQL Server 查询性能优化——索引与SARG(三)
查看>>
Oracle EBS:打开工作日历查看
查看>>
浅谈字节序(Byte Order)及其相关操作
查看>>
OSG闪存
查看>>
C#迭代器
查看>>