AMQP

通过php-amqplib/php-amqplib库和eventloop实现异步回调。

底层是基于stream库并不是swoole_client,write的操作依旧是堵塞的,不过这个影响已经极小,read的操作通过eventloop实现了异步读,所以可以实现异步的订阅。

比较具体的用法可以参考php-amqplib中的demo。但是有些细节需要注意,有所不同。

初始化的地点依旧是在AppServer的initAsynPools下

例子:

/**
     * 这里可以进行额外的异步连接池,比如另一组redis/mysql连接
     * @param $workerId
     * @return array
     */
    public function initAsynPools($workerId)
    {
        parent::initAsynPools($workerId);
        if($workerId==0) {
            $amqp = new AMQP('localhost',5672,'guest','guest');
            $channel = $amqp->channel();
            $channel->queue_declare('msgs', false, true, false, false);
            $channel->exchange_declare('router', 'direct', false, true, false);
            $channel->queue_bind('msgs', 'router');
            $channel->basic_consume('msgs', 'consumer', false, false, false, false, function (AMQPMessage $message)
            {
                echo "\n--------\n";
                echo $message->body;
                $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
            });
        }
    }

这里只在第一个进程中启动了AMQP客户端。

异步AMQP客户端是由Server\Asyn\AMQP\AMQP构建的,不要使用错了。

和php-amqplib库中demo不同的是,千万不要使用wait方法进行堵塞!

重要的事情说三遍,千万不要使用任何wait方法或者wait开头的方法进行堵塞,千万不要使用任何wait方法或者wait开头的方法进行堵塞 ,千万不要使用任何wait方法或者wait开头的方法进行堵塞 !

results matching ""

    No results matching ""