900字范文,内容丰富有趣,生活中的好帮手!
900字范文 > rabbitmq 存入mysql_将RabbitMQ使用者数据保存到数据库中

rabbitmq 存入mysql_将RabbitMQ使用者数据保存到数据库中

时间:2018-09-09 14:28:42

相关推荐

rabbitmq 存入mysql_将RabbitMQ使用者数据保存到数据库中

最近几天我一直在学习RabbitMQ和消息传递,我正在关注这个tutorial . 我已经设法让一切都像本教程一样工作 . 我在Laravel完成了这个项目,我创建了发布者和消费者脚本,它们按预期工作 . 到目前为止,我只使用 echo 命令显示消费者数据 . 我通过终端调用这个脚本 . 我的消费者脚本如下所示:

$host = 'secret';

$port = 5672;

$user = 'secret';

$pass = 'secret';

$vhost = 'secret';

$exchange = 'balance';

$queue = 'local_balance';

$connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost);

$channel = $connection->channel();

/*

The following code is the same both in the consumer and the producer.

In this way we are sure we always have a queue to consume from and an

exchange where to publish messages.

*/

/*

name: $queue

passive: false

durable: true // the queue will survive server restarts

exclusive: false // the queue can be accessed in other channels

auto_delete: false //the queue won't be deleted once the channel is closed.

*/

$channel->queue_declare($queue, false, true, false, false);

/*

name: $exchange

type: direct

passive: false

durable: true // the exchange will survive server restarts

auto_delete: false //the exchange won't be deleted once the channel is closed.

*/

$channel->exchange_declare($exchange, 'direct', false, true, false);

$channel->queue_bind($queue, $exchange);

/**

* @param AMQPMessage $message

*/

function process_message(AMQPMessage $message){

echo $message->body;

$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);

}

/*

queue: Queue from where to get the messages

consumer_tag: Consumer identifier

no_local: Don't receive messages published by this consumer.

no_ack: Tells the server if the consumer will acknowledge the messages.

exclusive: Request exclusive consumer access, meaning only this consumer can access the queue

nowait:

callback: A PHP Callback

*/

$consumerTag = 'local.consumer';

$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');

/**

* @param \PhpAmqpLib\Channel\AMQPChannel $channel

* @param \PhpAmqpLib\Connection\AbstractConnection $connection

*/

function shutdown($channel, $connection){

$channel->close();

$connection->close();

}

register_shutdown_function('shutdown', $channel, $connection);

while (count($channel->callbacks)) {

$channel->wait();

}

所以我的猜测是,我必须将数据库逻辑放入“process_message”函数,但到目前为止我还没有找到任何解决方法 . 如果您需要更多代码,或者对如何以不同方式解决此问题有任何建议,请告诉我们 . 任何帮助表示赞赏 .

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。