AMQP
通过php-amqplib/php-amqplib库和eventloop实现异步回调。
底层是基于stream库并不是swoole_client,write的操作依旧是堵塞的,不过这个影响已经极小,read的操作通过eventloop实现了异步读,所以可以实现异步的订阅。
比较具体的用法可以参考php-amqplib中的demo。但是有些细节需要注意,有所不同。
初始化的地点依旧是在AppServer的initAsynPools下
例子:
/**
* 这里可以进行额外的异步连接池,比如另一组redis/mysql连接
* @param $workerId
* @return array
*/
public function initAsynPools($workerId)
{
parent::initAsynPools($workerId);
if($workerId==0) {
$amqp = new AMQP('localhost',5672,'guest','guest');
$channel = $amqp->channel();
$channel->queue_declare('msgs', false, true, false, false);
$channel->exchange_declare('router', 'direct', false, true, false);
$channel->queue_bind('msgs', 'router');
$channel->basic_consume('msgs', 'consumer', false, false, false, false, function (AMQPMessage $message)
{
echo "\n--------\n";
echo $message->body;
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
});
}
}
这里只在第一个进程中启动了AMQP客户端。
异步AMQP客户端是由Server\Asyn\AMQP\AMQP构建的,不要使用错了。
和php-amqplib库中demo不同的是,千万不要使用wait方法进行堵塞!
重要的事情说三遍,千万不要使用任何wait方法或者wait开头的方法进行堵塞,千万不要使用任何wait方法或者wait开头的方法进行堵塞 ,千万不要使用任何wait方法或者wait开头的方法进行堵塞 !