RabbitMQ的元数据重建

1.概述

对于RabbitMQ运维层面来说,扩容和迁移是必不可少。扩容比较简单,一般往集群中加入新的机器节点即可,不过新的机器节点中是没有消息的,如果想要新加入的节点能快速的存储消息还是需要做点小手术的。不过这是后话,本文的主要内容是迁移,而迁移的首要工作就是为新的集群重建原集群的元数据。

重建RabbitMQ元数据,说白了就是在新的集群上重新创建exchange、queue以及彼此的binding关系。当然最好连policy,vhost,users等都能重建。

本文介绍三种重建元数据的方法:

  1. 程序化重建,即编写程序制成可执行jar包。
  2. 使用WEB UI进行重建
  3. 使用http API重建

2.使用程序化重建

程序化重建之前首先要准备原集群的元数据,包括exchange、queue、bindingkey、exchange类型。
示例元数据如下(保存成文本文件metadata.txt):

exchange.migrate.demo1 queue.migrate.demo1 demo1 direct
exchange.migrate.demo2 queue.migrate.demo2 demo2 direct

注:彼此之间用空格隔开,最后一个exchange类型可以缺省,缺省值为direct。

我们的程序首先会读取这个元数据文本,然后保存在内存之中,方便之后创建。这里与这个元数据对应的类为BindingObject。详细代码如下:

public class BindingObject {
    private String channel;
    private String queue;
    private String routingKey;
    private String exchangeType;

    public BindingObject(String channel, String queue, String routingKey) {
        super();
        this.channel = channel;
        this.queue = queue;
        this.routingKey = routingKey;
        this.exchangeType = "direct";
    }

    public BindingObject(String channel, String queue, String routingKey,
            String exchangeType) {
        super();
        this.channel = channel;
        this.queue = queue;
        this.routingKey = routingKey;
        this.exchangeType = exchangeType;
    }

//此处省略各个成员变量的Getter和Setter方法

    @Override
    public String toString(){
        return "[channel="+channel+", queue="+queue+", routingKey="+routingKey+",exchangeType="+exchangeType+"]";
    }
}

之后建立主程序——RmqMetadataRebuild.java。最后打包成jar包,我们取名为rebuild.jar
在真正创建的时候调用:

java -jar rebuild.jar connection=192.168.0.2:5672,192.168.0.3:5672 filename=/root/util/metadata.txt username=root password=root vhost=/

其中有两个参数是必须的,即connection和filename。username、password、vhost可以根据实际情况修改代码来实现其缺省值。

RmqMetadataRebuild.java完整代码如下:

package com.vms.rabbitmq.rebuild;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RmqMetadataRebuild {
    private static final Logger logger = LoggerFactory.getLogger(RmqMetadataRebuild.class);

    private static List<IpPortKV> addressList = new ArrayList<IpPortKV>();
    private static String username = "root";
    private static String password = "root";
    private static String vhost = "/";
    private static String filename = null;

    private static Connection connection = null;
    private static Channel channel = null;

    //xxx.jar [connection=] [filename=] [username=] [password=] [vhost=] [filename=]
    //connection=192.168.0.2:5672,192.168.0.3:5672 filename=/root/util/metadata.txt username=root password=root vhost=/
    public static void main(String[] args) {
        logger.debug("begin rebuild rabbitmq metadata....");
        for(int i=0;i<args.length;i++){
            logger.debug("{}",args[i]);
        }

        if(!args[0].startsWith("connection=")){
            logger.error("no connection parameters!");
            printTipInfo();
            System.exit(1);
        }

        try {
            parseConnection(args[0]);
        } catch (Exception e) {
            logger.error("{}",e);
            System.exit(1);
        }

        if(addressList.size()<1){
            logger.error("no connection parameters!");
            printTipInfo();
            System.exit(1);
        }

        if(!args[1].startsWith("filename=")){
            logger.error("no rebuild metadata file!");
            printTipInfo();
            System.exit(1);
        }
        filename = args[1].substring("filename=".length());

        if(args.length>2){
            for(int i=2;i<args.length;i++){
                if(args[i].startsWith("username=")){
                    username = args[i].substring("username=".length());
                }else if(args[i].startsWith("password=")){
                    password = args[i].substring("password=".length());
                }else if(args[i].startsWith("vhost=")){
                    vhost =args[i].substring("vhost=".length());
                }
            }
        }

        logger.debug("addressList={}",addressList);
        logger.debug("filename={}",filename);
        logger.debug("username={}",username);
        logger.debug("password={}",password);
        logger.debug("vhost={}",vhost);

        RmqMetadataRebuild rmr = new RmqMetadataRebuild();

        try {
            rmr.start();
        } catch (IOException e) {
            logger.error("{}",e);
            rmr.shutdown();
        } catch (TimeoutException e) {
            logger.error("{}",e);
            rmr.shutdown();
        }

        try {
            List<BindingObject> list = rmr.getBindingList(filename);
            for(BindingObject bindingObject: list){
                String exchange = bindingObject.getChannel();
                String queue = bindingObject.getQueue();
                String rk = bindingObject.getRoutingKey();
                String exchangeType = bindingObject.getExchangeType();
                channel.exchangeDeclare(exchange, exchangeType,true,false,null);
                channel.queueDeclare(queue, true, false, false, null);
                channel.queueBind(queue, exchange, rk);
            }
            logger.info("rebuild rabbitmq metadata successfully!");

        } catch (IOException e) {
            logger.error("{}",e);
        } finally{
            rmr.shutdown();
        }
    }

    private void start() throws IOException, TimeoutException{
        int addressNum = addressList.size();
        Address[] addresses = new Address[addressNum];
        for(int i=0;i<addressNum;i++){
            IpPortKV ipPortKV = addressList.get(i);
            Address address = new Address(ipPortKV.getIp(),ipPortKV.getPort());
            addresses[i] = address;
        }

        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(vhost);
        connection = factory.newConnection(addresses);
        channel = connection.createChannel();
        logger.info("connection and channel create successfully....");
    }

    private void shutdown(){
        try {
            channel.close();
            connection.close();
        } catch (IOException e) {
            logger.error("{}",e);
        } catch (TimeoutException e) {
            logger.error("{}",e);
        }
    }

    private static void parseConnection(String connection){
        String addresses = connection.substring("connection=".length());
        String addressArray[] = addresses.split(",");
        for(String address:addressArray){
            String ipPortArray[] = address.split(":");
            IpPortKV ipPortKV = new IpPortKV(ipPortArray[0],Integer.parseInt(ipPortArray[1]));
            addressList.add(ipPortKV);
        }
    }

    private List<BindingObject> getBindingList(String fileName) throws IOException{
        List<BindingObject> list = new ArrayList<BindingObject>();

        FileInputStream fis = new FileInputStream(fileName);
        InputStreamReader isr = new InputStreamReader(fis);
        BufferedReader br = new BufferedReader(isr);
        String str = null;
        while((str = br.readLine())!=null){
            String[] tempBindArray = str.split(" ");
            if(tempBindArray.length>=3){
                BindingObject bindingObject = new BindingObject(tempBindArray[0],tempBindArray[1],tempBindArray[2]);
                if(tempBindArray.length==4){
                    bindingObject.setExchangeType(tempBindArray[3]);
                }
                list.add(bindingObject);
            }
        }
        fis.close();
        isr.close();
        br.close();

        return list;
    }

    private static void printTipInfo(){
        System.out.println("use like this: xxx.jar [connection=] [filename=] [username=] [password=] [vhost=] [filename=]");
        System.out.println("connection and filename is necessary.");
        System.out.println("use demo: connection=192.168.0.2:5672,192.168.0.3:5672 filename=/root/util/metadata.txt username=root password=root vhost=/ ");
        System.out.println("please try it again....");
    }
}

其中的IpPortKV是用来解析connection时做一下缓存过渡的。参考代码如下:

public class IpPortKV {
    private String ip;
    private int port;

//此处省略各个成员变量的Getter和Setter方法

    @Override
    public String toString(){
        return "[ip="+ip+", port="+port+"]";
    }
}

最后将项目打成可执行jar包即可。注意这里还用到了slf4j-log4j,可以删除相关的代码,也可以导入相关的jar包即可运行。

上面的代码中并没有重建users、policy、vhost等元数据,如果需要重建这些信息需要丰富一样整个代码。或者直接选用下面的方式。


3. 使用WEB UI重建

这个相对于上面的重建方式而言显得非常的简单方便。前提是开启了rabbitmq_management插件(rabbtimq-plugins enable rabbitmq_management),并且有可以WEB UI的管理员用户,具备可配置、可读、可写的权限。

在WEB UI的Overview页面下方可以找到:

只需要在原集群的WEB UI中下载(左边“Download broker definitions”)元数据配置文件,然后再导入到新集群的WEB UI中即可(上图右边“Upload broker defintions”)。

元数据配置文件是一个json文件,可以参考下面的内容:

{
    "rabbit_version": "3.5.7",
    "users": [
        {
            "name": "guest",
            "password_hash": "8oKfdYGw1Ivr91EvK53S9cR9s0=",
            "tags": "administrator"
        },
        {
            "name": "root",
            "password_hash": "XQrOsQGncx5aX/QVLSe5CmM7FE=",
            "tags": "administrator"
        }
    ],
    "vhosts": [
        {
            "name": "/"
        },
        {
            "name": "default"
        }
    ],
    "permissions": [
        {
            "user": "root",
            "vhost": "default",
            "configure": ".*",
            "write": ".*",
            "read": ".*"
        },
        {
            "user": "root",
            "vhost": "/",
            "configure": ".*",
            "write": ".*",
            "read": ".*"
        },
        {
            "user": "guest",
            "vhost": "/",
            "configure": ".*",
            "write": ".*",
            "read": ".*"
        }
    ],
    "parameters": [],
    "policies": [
        {
            "vhost": "default",
            "name": "policy.migrate",
            "pattern": "^queue",
            "apply-to": "queues",
            "definition": {
                "ha-mode": "exactly",
                "ha-params": 2,
                "ha-sync-mode": "automatic"
            },
            "priority": 0
        }
    ],
    "queues": [
        {
            "name": "queue.migrate.demo",
            "vhost": "default",
            "durable": true,
            "auto_delete": false,
            "arguments": {}
        }
    ],
    "exchanges": [
        {
            "name": "exchange.migrate.demo",
            "vhost": "default",
            "type": "direct",
            "durable": true,
            "auto_delete": false,
            "internal": false,
            "arguments": {}
        }
    ],
    "bindings": [
        {
            "source": "exchange.migrate.demo",
            "vhost": "default",
            "destination": "queue.migrate.demo",
            "destination_type": "queue",
            "routing_key": "demo",
            "arguments": {}
        }
    ]
}

由上可知,配置文件中包含rabbit_version,users, vhosts, permissions, parameters, policies, queues, exchanges,bindings等内容,概括了RabbitMQ所涉及的所有元数据配置。

如果备份集群中已有元数据与导入的元数据冲突,则导入的元数据会覆盖;如果没有冲突,则会保留。

这种重建元数据的方法简单、方便、高效,但是有个问题值得注意,那就是不同的RabbitMQ版本之间的元数据配置可能会不兼容,如果无法解决,那就只能采用第一种程序化的重建方式。如果原集群由于某种原因无法启动,那么此种方法也无法奏效,不过可以定时备份这些元数据(或者在元数据有变更时备份)来得到解决。


4. Http API的方式重建

Http API的重建方式和上面的WEB UI方式相同,都是基于RabbitMQ元数据配置文件的,只不过图形化界面操作封装了一下Http API(https://rawcdn.githack.com/rabbitmq/rabbitmq-management/rabbitmq_v3_6_10/priv/www/api/index.html)。
获取元数据配置json的命令:

wget --user root --password root http://192.168.0.2:15672/api/definitions -O /root/util/rabbit_source.json   

然后通过Http Post的方式将rabbit_source.json文件上传到新的备份集群中:

curl -T /root/util/rabbit_source.json -X POST -u root:root -H "Content-Type: application/json" http://192.168.0.3:15672/api/definitions

对于Http API的重建方式,当然也可以使用HttpClient进行操作,而非上面的命令行的形式。

时间: 2025-01-25 20:03:00

RabbitMQ的元数据重建的相关文章

消息中间件收录集

本篇主要整理工作中遇到的一些消息中间件的相关知识,包括Kafka, RabbitMQ, RocketMQ, ActiveMQ等,不排除收录其他消息中间件的可能. 这里会持续收录相关知识,包括安装.部署.使用示例.监控.运维.原理等. 所有新撰写的与中间件有关的文章都会收录与此,注意保存本文链接. Last Update Time: 2017-10-26 08:23 Update Content: RabbitMQ管理(5)--集群管理 通用 什么是Zero-Copy?(sendfile) 1.

RabbitMQ之监控(3)

确保RabbitMQ能够健康的运行还不足以让人放松警惕.考虑这样一种情况:小明为小张创建了一个队列并绑定了一个交换器,之后某人由于疏忽而阴差阳错的删除了这个队列而无人得知,最后小张在使用这个队列的时候就会报出"NOT FOUND"的错误.如果这些在测试环境中发生,那么还可以弥补.如果在实际生产环境中,如果误删了一个队列,那必然会造成不可估计的影响.此时业务方如果正在使用这个队列,正常情况下会立刻报出异常,相关人员迅可以迅速做出动作以尽可能的降低影响.试想如果是一个定时任务调用此队列,并

全方位攻略 Alfred 效率神器

Alfred 效率神器全攻略 工欲善其事必先利其器,Alfred 在 Mac 上所迸发的效率是前所未有的! 前言 作为 Mac 上最强大的效率工具,Alfred 在 Spotlight(MacOS X 自带的搜索和快速启动引擎)的基础上优化了快速启动与搜索的功能,还引入了 Workflows 等强大的扩展功能,使之成为了一个拥有无限自动化潜力的「工具平台」软件,可以用它来实现近乎一切有关自动化的想法. 我虽不是 Alfred 的骨灰级用户,但作为一个普通「玩家」,就已经在日常工作中很高频率的使用

Facebook利用PCIe闪存为廉价盘阵添加动力

http://www.aliyun.com/zixun/aggregation/1560.html">Facebook升级了 Flashcache开源工具,让管理员从配置了PCIe闪存卡的廉价 磁盘阵列中获得更高性能. Flashcache工具现在升级到了3.0版本.该工具让Facebook利用PCIe闪存卡上的高性能缓存加速访问重要数据,不需要花费高昂的成本使用全闪存阵列. Flashcache是一项回写块缓存技术,被部署为一个Linux内核设备映射的目标,使得将通用系统用于应对高流量应

【原创】RabbitMQ 相关问题汇总

[面向对象和免责声明]      本文不是面向初级 RabbitMQ 的使用者,本文面向的是对 RabbitMQ 有一定的研究,对使用中的细节问题有一定的思考,对各种模型背后的原因有强烈的探究欲望的人.本文的所有内容不保证 100% 正确,但至少是我目前为止认为正确的结论,如果您有任何高见,敬请赐教,不甚感激. [RabbitMQ 问答]       本章节主要解答一些在 RabbitMQ 使用过程中,经常被问到的问题.其实很多问题的答案都可以在各类文档里找到,建议多翻阅参考资料中给出的文档. 

WCF技术剖析之二十五:元数据(Metadata)架构体系全景展现[WS标准篇]

元数据实际上是服务终结点的描述,终结点由地址(Address).绑定(Binding)和契约(Contract)经典的ABC三要素组成.认真阅读过<WCF技术剖析(卷1)>的读者相对会对这三要素的本质有一个深刻的认识:地址决定了服务的位置并实现相应的寻址机制:契约描述了消息交换模式(Message Exchange Pattern: MEP)以及消息的结构(Schema):绑定则通过创建信道栈实现对消息的编码.传输和基于某些特殊的功能(比如实现事务.可靠传输以及基于消息的安全)对消息作出的处理

【原创】RabbitMQ 之 Federation 插件(翻译)

Federation Plugin   Introduction简介 The high-level goal of the federation plugin is to transmit messages between brokers without requiring clustering. This is useful for various reasons:federation 插件的 high level 目标是,在不同 broker 之间进行消息传递而无需建立集群:该功能在很多场景

RabbitMQ之监控(2)

本文接RabbitMQ之监控(1). 不管是通过HTTP API接口还是客户端,获取的数据都是为了提供监控视图之用,不过这一切都基于RabbitMQ服务运行完好的情况下.虽然可以通过某些其他工具或方法来检测RabbitMQ进程是否在运行(如:ps aux | grep rabbitmq),或者5672端口是否开启(如:telnet xxx.xxx.xxx.xxx 5672),但是这样依旧不能真正的评判RabbitMQ是否还具备服务外部请求的能力.这里就需要使用AMQP协议来构建一个Ping的检测

ASMCMD执行ASM元数据备份与还原

备份你的磁盘组通常来说是不需要的,因为可以简单的重建它并且还原它的内容.用户创建或定义大 量用户模板,别名与目录并且磁盘组需要重新建,你需要手动重新创建这些ASM用户对象.这了完成 这个任务,Oracle 11g引入了新的工具来备份ASM对象的元数据.新的工具, ASM Metadata Backup adn Restore(AMBR),它是ASMCMD的一个子组件.AMBR提供了使用完全相同模板,属性与别名目录结 构来重新创建之前创建过的ASM磁盘组,因此保护磁盘组结构.AMBR有两种模式:备