RabbitMQ学习系列(五): RPC 远程过程调用

前面讲过一些RabbitMQ的安装和用法,也说了说RabbitMQ在一般的业务场景下如何使用。不知道的可以看我前面的博客,http://www.cnblogs.com/zhangweizhong/category/855479.html

不过,最近有朋友问我,RabbitMQ RPC 是干嘛的,有什么用。

其实,RabbitMQ RPC 就是通过消息队列(Message Queue)来实现rpc的功能,就是,客户端向服务端发送定义好的Queue消息,其中携带的消息就应该是服务端将要调用的方法的参数 ,并使用Propertis告诉服务端将结果返回到指定的Queue。

1.RabbitMQ RPC的特点

  • Message Queue把所有的请求消息存储起来,然后处理,和客户端解耦。
  • Message Queue引入新的结点,系统的可靠性会受Message Queue结点的影响。
  • Message Queue是异步单向的消息。发送消息设计成是不需要等待消息处理的完成。

所以对于有同步返回需求,Message Queue是个不错的方向。

2.普通PRC的特点

  • 同步调用,对于要等待返回结果/处理结果的场景,RPC是可以非常自然直觉的使用方式。当然RPC也可以是异步调用。
  • 由于等待结果,客户端会有线程消耗。

如果以异步RPC的方式使用,客户端线程消耗可以去掉。但不能做到像消息一样暂存消息请求,压力会直接传导到服务端。

3.适用场合说明

  • 希望同步得到结果的场合,RPC合适。
  • 希望使用简单,则RPC;RPC操作基于接口,使用简单,使用方式模拟本地调用。异步的方式编程比较复杂。
  • 不希望客户端受限于服务端的速度等,可以使用Message Queue。

4.RabbitMQ RPC工作流程:

 

基本概念:

Callback queue 回调队列客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。

Correlation id 关联标识客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。

流程说明

  • 当客户端启动的时候,它创建一个匿名独享的回调队列。
  • 在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
  • 将请求发送到一个 rpc_queue 队列中。
  • 服务器等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 reply_to 字段指定的队列。
  • 客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用

 5.完整代码:

  1. 创建两个控制台程序,作为RPC Server和RPC Client, 引用 RabbitMQ.Client,

  2. RPC Server

    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "OrderQueue", UserName = "zhangweizhong", Password = "weizhong1988", Port = 5672 };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "rpc_queue",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                channel.BasicQos(0, 1, false);
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queue: "rpc_queue",
                                     noAck: false,
                                     consumer: consumer);
                Console.WriteLine(" [x] Awaiting RPC requests");

                while (true)
                {
                    string response = null;
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                    var body = ea.Body;
                    var props = ea.BasicProperties;
                    var replyProps = channel.CreateBasicProperties();
                    replyProps.CorrelationId = props.CorrelationId;

                    try
                    {
                        var message = Encoding.UTF8.GetString(body);
                        int n = int.Parse(message);
                        Console.WriteLine(" [.] fib({0})", message);
                        response = fib(n).ToString();
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(" [.] " + e.Message);
                        response = "";
                    }
                    finally
                    {
                        var responseBytes = Encoding.UTF8.GetBytes(response);
                        channel.BasicPublish(exchange: "",
                                             routingKey: props.ReplyTo,
                                             basicProperties: replyProps,
                                             body: responseBytes);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag,
                                         multiple: false);
                    }
                }
            }
        }

        /// <summary>
        /// Assumes only valid positive integer input.
        /// Don't expect this one to work for big numbers,
        /// and it's probably the slowest recursive implementation possible.
        /// </summary>
        private static int fib(int n)
        {
            if (n == 0 || n == 1)
            {
                return n;
            }

            Thread.Sleep(1000 * 10);

            return n;
        }
    }

 

  3. RPC Client

    class Program
    {
        static void Main(string[] args)
        {
            for (int i = 0; i < 10; i++)
            {
                Stopwatch watch = new Stopwatch();

                watch.Start();

                var rpcClient = new RPCClient();

                Console.WriteLine(string.Format(" [x] Requesting fib({0})", i));

                var response = rpcClient.Call(i.ToString());

                Console.WriteLine(" [.] Got '{0}'", response);

                rpcClient.Close();

                watch.Stop();

                Console.WriteLine(string.Format(" [x] Requesting complete {0} ,cost {1} ms", i, watch.Elapsed.TotalMilliseconds));
            }

            Console.WriteLine(" complete!!!! ");

            Console.ReadLine();
        }
    }

    class RPCClient
    {
        private IConnection connection;
        private IModel channel;
        private string replyQueueName;
        private QueueingBasicConsumer consumer;

        public RPCClient()
        {
            var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "OrderQueue", UserName = "zhangweizhong", Password = "weizhong1988", Port = 5672 };
            connection = factory.CreateConnection();
            channel = connection.CreateModel();
            replyQueueName = channel.QueueDeclare().QueueName;
            consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue: replyQueueName,
                                 noAck: true,
                                 consumer: consumer);
        }

        public string Call(string message)
        {
            var corrId = Guid.NewGuid().ToString();
            var props = channel.CreateBasicProperties();
            props.ReplyTo = replyQueueName;
            props.CorrelationId = corrId;

            var messageBytes = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "",
                                 routingKey: "rpc_queue",
                                 basicProperties: props,
                                 body: messageBytes);

            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                if (ea.BasicProperties.CorrelationId == corrId)
                {
                    return Encoding.UTF8.GetString(ea.Body);
                }
            }
        }

        public void Close()
        {
            connection.Close();
        }
    }

  4.分别运行Server和Client

 

6.最后

  1.参照RabbitMQ官方教程的RPC,地址:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

  2.本文源代码下载,http://files.cnblogs.com/files/zhangweizhong/Weiz.RabbitMQ.RPC.rar

  3.博客原地址:http://fpeach.com/post/2016/12/01/RabbitMQ%E5%AD%A6%E4%B9%A0%E7%B3%BB%E5%88%97%EF%BC%88%E4%BA%94%EF%BC%89-RPC-%E8%BF%9C%E7%A8%8B%E8%BF%87%E7%A8%8B%E8%B0%83%E7%94%A8.aspx

 

时间: 2024-10-02 13:05:17

RabbitMQ学习系列(五): RPC 远程过程调用的相关文章

RabbitMQ学习系列(三): C# 如何使用 RabbitMQ

上一篇已经讲了Rabbitmq如何在Windows平台安装,还不了解如何安装的朋友,请看我前面几篇文章:RabbitMQ学习系列一:windows下安装RabbitMQ服务 , 今天就来聊聊 C# 实际开发的过程中,怎么调用 用RabbitMQ. 一.客户端 RabbitMQ.Client 是rabbitmq 官方提供的的客户端,net 版本地址 :http://www.rabbitmq.com/dotnet.html EasyNetQ 是基于RabbitMQ.Client 基础上封装的开源客户

轻松搞定RabbitMQ(七)——远程过程调用RPC

       翻译:http://www.rabbitmq.com/tutorials/tutorial-six-java.html       在第二篇博文中,我们已经了解到了如何使用工作队列来向多个消费者分散耗时任务.       但是付过我们需要在远程电脑上运行一个方法然后等待结果,该怎么办?这是不同的需求.这个模式通常叫做RPC.        本文我们将使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器端.由于我们没有任何真实的耗时任务需要分配,所以我们将创建

Android学习Scroller(五)——详解Scroller调用过程以及View的重绘

PS: 该篇博客已经deprecated,不再维护,详情请参见  站在源码的肩膀上全解Scroller工作机制  http://blog.csdn.net/lfdfhl/article/details/53143114 MainActivity如下: package cc.ww; import android.os.Bundle; import android.widget.ImageView; import android.widget.ImageView.ScaleType; import

RabbitMQ学习系列(六): RabbitMQ 高可用集群

前面讲过一些RabbitMQ的安装和用法,也说了说RabbitMQ在一般的业务场景下如何使用.不知道的可以看我前面的博客,http://www.cnblogs.com/zhangweizhong/category/855479.html 本来一直想写一个介绍RabbitMQ高可用的集群的文章.不过,后来发现园子里,有个已经RabbitMQ大牛写了,关于高可用集群的文章了.特别巧合的是,还是以前公司的同事.所以,这里就不啰嗦.直接引用过来吧.原文地址:http://www.cnblogs.com/

RabbitMQ学习系列(一): 介绍

1. 介绍     RabbitMQ是一个由erlang开发的基于AMQP(Advanced Message Queue )协议的开源实现.用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面都非常的优秀.是当前最主流的消息中间件之一.     RabbitMQ的官网:http://www.rabbitmq.com   2. AMQP AMQP,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,同样,消息使用者

Spark-SparkSQL深入学习系列五(转自OopsOutOfMemory)

  /** Spark SQL源码分析系列文章*/   前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer 以及核心类库TreeNode,本文将详细讲解Spark SQL的Optimizer的优化思想以及Optimizer在Catalyst里的表现方式,并加上自己的实践,对Optimizer有一个直观的认识.   Optimizer的主要职责是将Analyzer给Resolved的Logical Plan根据不同的优化策略Batch,来对语

RabbitMQ学习系列(二): RabbitMQ安装与配置

上一篇,简单介绍了RabbitMQ的情况还有一些相关的概念,这一篇,会讲讲 RabbitMQ安装与配置. 1.安装 Rabbit MQ 是建立在强大的Erlang OTP平台上,因此安装RabbitMQ之前要先安装Erlang. erlang:http://www.erlang.org/download.html rabbitmq:http://www.rabbitmq.com/download.html 注意: 1.现在先别装最新的 3.6.3 ,本人在安装完最新的版本,queue 队列有问题

Openstack Nova 源码分析 — RPC 远程调用过程

目录 目录 Nova Project Services Project 的程序入口 setuppy Nova中RPC远程过程调用 nova-compute RPC API的实现 novacomputemanager 模块 最后 Nova Project Services nova-api:捕获novaclient发送过来的HTTP请求,并且将它转换为AMQP消息,通过Queue来与别的services通信. nova-conductor:为数据库访问提供了一层安全保障. NOTE:除了nova-

WorldWind系列五:插件加载过程全解析

不得不承认World Wind的代码真的很庞大,没有太多帮助文档的前提下,一头钻进代码里肯定令你头疼的,甚至研究代码间关联仿佛是在走迷宫.我最近一直想弄明白如何在 MenuBar中加载那些插件的,WorldWind学习系列四中研究的只是特殊的三个功能加载的,那三个没有继承Plugin类,不算是插件功能加载.所以WorldWind学习系列四加载的三个是特殊情况,不是一般的插件加载.今天下午终于柳暗花明,如果你真正关注World Wind分析,那么就好好看看下面的插件加载过程全解析. 我们先看看Pl