侧边栏壁纸
博主头像
码森林博主等级

一起走进码森林,享受编程的乐趣,发现科技的魅力,创造智能的未来!

  • 累计撰写 146 篇文章
  • 累计创建 74 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

消息中间件之RocketMQ:NameServer 的设计思想和源码解读

码森林
2022-02-07 / 0 评论 / 1 点赞 / 626 阅读 / 34,775 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-02-08,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

从本文可以获知 NameServer 设计思想以及实现原理,本系列文章都基于 RocketMQ 4.9.2 版本。

NameServer 设计思想

NameServer 是 RocketMQ 的路由注册中心,它管理着 Broker 集群向其注册的路由信息和用于给客户端查询的队列信息。

为什么放弃 ZooKeeper 而选择重新开发 NameServer?

我们知道 RocketMQ 的设计参考了 Kafka,RocketMQ 的前身是 MetaQ,MetaQ 1.x 和 MetaQ 2.x 是依赖 ZooKeeper 的,但 RocketMQ(即 MetaQ 3.x)却去掉了ZooKeeper依赖,转而采用自己开发的 NameServer。

首先我们对比一下 Kafka 和 RocketMQ 的 Broker 集群部署架构:

Kafka Broker 集群部署

在 Kafka 中,Topic 是逻辑概念,分区(Partition)是物理概念。1 个 Topic 可以设置在多个分区上,每个分区可以设置多个副本(Replication),即有 1个 Master 分区、对个 Slave 分区。

从上图看,搭建了 3 个 Broker 构成一个集群,创建了一个 Topic 名为 TopicA,有 3 个分区,分别是 Part0、Part1、Part2,绿色表示 Master,灰色表示 Slave。

在 Kafka 中发送消息只会发送到 Master 分区,再由 Master 分区同步到相同分区的 Slave,且每个分区的 Master 和 Slave 分散在不同的 Broker 上。

如向 Part0 发送消息,只会发送到 Broker0,再由 Broker0 同步到 Broker1、Broker2 的 Part0 副本中。

RocketMQ Broker 集群部署

在 RocketMQ 中国,Topic 也是逻辑概念,队列(Queue)是物理概念(相当于 Kafka 中的分区),一个 Topic 可以设置在多个 Broker 上,且可以设置多个队列,每个 Master Broker 可以设置至少一个或多个 Slave,即每个队列可以有多个副本,一个 Matser,一个或多个 Slave。

从上图看,搭建了 9 个 Broker 构成一个集群,其中有 3 个 Master,每个 Master 对应 2 个 Slave。创建一个 Topic 名为 TopicA,分别设置在 Broker0、Broker1、Broker2 上,绿色表示 Master,灰色表示 Slave。

在 RocketMQ 中发送消息会根据负载算法发送到的 Broker Master ,再由 Broker Master 同步到对应的 Broker Slave 副本中。

Kafka 与 RockeMQ 的差异

在 Kafka 中,Master 与 Slave 在同一台 Broker 上,Broker 上存在多个分区,每个分区有分为 Master 分区和 Slave 分区,分布在不同 Broker 上。在运行过程中,分区的 Master 或者 Slave 角色是通过 ZooKeeper 选举出来的。

而在 RocketMQ 中,Master 与 Slave 不在同一台 Broker 上,每台 Broker 不是 Master 就是 Slave,Broker 的 Master 或 Slave 角色是在 Broker 的配置文件中预先定义好的,在 Broker 启动之前就已经决定了,所以不需要进行选举。

ZooKeeper 选举机制的原理是少数服从多数,所以必须要通过 ZooKeeper 集群中多个实例来完成,这些实例必须相互通信,如果实例越多,网络通信就变得更加复杂且低效。

NameServer 解决了哪些问题?

NameServer 的设计让网络通信变得非常简单,从而使性能得到了极大的提升

为了避免单点故障 NameServer 也需要集群部署,但集群之间互不通信,NameServer 是无状态的,可以任意部署多个。

在 Broker 启动时会向每台 NameServer 注册自己的路由信息,因此每台 NameServer 都会保存一份完整的路由信息。NameServer 与每台 Broker 都会保持长连接并每隔 30s 心跳检测,如果 Broker 发生故障,NameServer 无法收到心跳,会在一段时间后剔除 Broker 路由信息。这也是 RocketMQ 的故障规避机制

NameServer 为了降低实现的复杂度,不会立即通知 Producer 和 Consumer

当某个 NameServer 宕机了,Broker 仍然可以向其他 NameServer 同步路由信息,Producer 和 Consumer 仍然可以动态感知 Broker 的路由信息,提高了可用性

NameServer 启动流程

在 rocketmq 源码 namesrv 项目下,我们找到启动类:org.apache.rocketmq.namesrv.NamesrvStartup。在 main 函数里主要有两步实现:

 public static NamesrvController main0(String[] args) {

        try {
            // 1、创建 NameServer 核心控制器
            NamesrvController controller = createNamesrvController(args);
            // 2、启动 NameServer 核心控制器
            start(controller);
						
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

1、创建 NameServer 核心控制器

我们主要看 NamesrvStartup.createNamesrvController 方法:

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        // 设置系统属性 RocketMQ 当前版本号
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //PackageConflictDetect.detectFastjson();
        // 获取启动时附加参数
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return null;
        }
				// 创建 NameServer 业务参数对象
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
  			// 创建 NameServer 网络参数对象
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);
        // 解析启动时把指定的配置文件或启动命令中的选项值,填充到 nameServerConfig,nettyServerConfig对象
        if (commandLine.hasOption('c')) {
            // -c
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }
        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);

        // 根据 namesrvConfig 和 nettyServerConfig 创建 NamesrvController 实例
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        // 记住所有配置以防止丢弃
        controller.getConfiguration().registerConfig(properties);

        return controller;
    }

从上面源码分析,createNamesrvController 方法主要干了两件事情:

  1. 创建了 NamesrvConfig 业务参数对象和 NettyServerConfig 网络参数对象,并把启动时的参数解析后填充到 namesrvConfignettyServerConfig
  2. 根据 namesrvConfignettyServerConfig 创建 NamesrvController 实例。

2、NamesrvConfig 业务参数配置

public class NamesrvConfig {
    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    private String productEnvName = "center";
    private boolean clusterTest = false;
    private boolean orderMessageEnable = false;
   // 忽略 getter 和 setter
}
  • rocketmqHome:RocketMQ 主目录,可以通过 -Drocketmq.home.dir=path 或通过设置环境变量 ROCKETMQ_HOME 来配置 RocketMQ 的主目录。
  • kvConfigPath:NameServer 存储 KV 配置属性的持久化路径。
  • configStorePath:NameServer 默认配置文件路径,不生效。NameServer 启动时如果要通过配置文件配置 NameServer 启动属性的话,请使用 -c 选项。
  • clusterTest:是否开启集群测试,默认不开启。
  • orderMessageEnable :是否支持顺序消息,默认是不支持。

3、NettyServerConfig 网络参数配置

public class NettyServerConfig implements Cloneable {
    private int listenPort = 8888;
    private int serverWorkerThreads = 8;
    private int serverCallbackExecutorThreads = 0;
    private int serverSelectorThreads = 3;
    private int serverOnewaySemaphoreValue = 256;
    private int serverAsyncSemaphoreValue = 64;
    private int serverChannelMaxIdleTimeSeconds = 120;
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    private boolean serverPooledByteBufAllocatorEnable = true;
    private boolean useEpollNativeSelector = false;
    // 忽略 getter 和 setter
}
  • listenPort:NameServer 监听端口,该值默认会被初始化为 9876。
  • serverWorkerThreads:Netty 业务线程池线程个数,默认值是 8。
  • serverCallbackExecutorThreads:Netty public 任务线程池线程个数,Netty 网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型(RequestCode)未注册线程池, 则由 public 线程池执行。默认值是 0。
  • serverSelectorThreads:IO 线程池线程个数,主要是 NameServer、Broker 端解析请求、返回相应的线程个数,这类线程主要是处理网 络请求的,解析请求包,然后转发到各个业务线程池完成具体的业务操作,然后将结果再返回调用方。默认值是 3。
  • serverOnewaySemaphoreValue:send oneway 消息请求井发度( Broker 端参数)。默认值是 256。
  • serverAsyncSemaphoreValue:异步消息发送最大并发度(Broker 端参数)。默认值是 64。
  • serverChannelMaxIdleTimeSeconds:网络连接最大空闲时间,默认 120s。如果连接空闲时间超过该参数设置的值,连接将被关闭。
  • serverSocketSndBufSize:网络 socket 发送缓存区大小,默认 64k。
  • serverSocketRcvBufSize:网络 socket 接收缓存区大小,默认 64k。
  • serverPooledByteBufAllocatorEnable:ByteBuffer 是否开启缓存,建议开启。默认开启。
  • useEpollNativeSelector:是否启用 Epoll IO模型,Linux 环境建议开启。默认关闭。

4、创建 NamesrvController 实例

加载完配置文件之后,创建 NamesrvController 实例,并将 namesrvConfignettyServerConfig 配置设置到 NamesrvController 对象中。同时还创建了 KVConfigManagerRouteInfoManagerBrokerHousekeepingServiceConfiguration 4个对象并设置到 NamesrvController 对象中。

public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.kvConfigManager = new KVConfigManager(this);
        this.routeInfoManager = new RouteInfoManager();
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}

KVConfigManager

其中 KVConfigManager 主要用来文件持久化保存管理配置的键值对。采用的是 HashMap 来存配置项,key 为 namespace,value 为 HashMap,存储的值采用的是 String。采用 ReentrantReadWriteLock 进行并发控制,支持序列 JSON 到磁盘,也支持从磁盘文件加载到内存。

public class KVConfigManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    private final NamesrvController namesrvController;
		 /**
     * ReadWriteLock 维护一对关联的锁,一个用于只读操作,一个用于写入。只要没有写者,读锁可能被多个读线程同时持有。写锁是独占的。
     */
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
        new HashMap<String, HashMap<String, String>>();

    public KVConfigManager(NamesrvController namesrvController) {
        this.namesrvController = namesrvController;
    }
		/**
     * 加载 kvConfig.json 配置文件,并把 json 加载到 hashmap
     */
    public void load() {
      ...
    }
    /**
     * 添加配置,先加写锁,put 到 hashmap 内存存储,再进行json序列化持久化到磁盘 kvConfig.json 文件
     *
     * @param namespace
     * @param key
     * @param value
     */
    public void putKVConfig(final String namespace, final String key, final String value) {
        ...
        this.persist();
    }
    /**
     * 内存持久化,加读锁
     */
    public void persist() {
        ...
    }

    /**
     * 删除配置,先加写锁,put remove hashmap 内存存储,再进行json序列化持久化到磁盘 kvConfig.json 文件
     *
     * @param namespace
     * @param key
     */
    public void deleteKVConfig(final String namespace, final String key) {
        ...
        this.persist();
    }
    /**
     * 根据 namespace 获取 k-v List 的byte数组,加读锁
     * @param namespace
     * @return
     */
    public byte[] getKVListByNamespace(final String namespace) {
       ...
    }
		/**
     * 获取配置,加读锁
     * @param namespace
     * @param key
     * @return
     */
    public String getKVConfig(final String namespace, final String key) {
        ...
    }
		 /**
     * 定期打印所有,加读锁
     */
    public void printAllPeriodically() {
       ...
    }
}

persist 持久化过程主要看 MixAll.string2File() 方法。

public static void string2File(final String str, final String fileName) throws IOException {
			 // 1、为新版本数据创建临时文件
        String tmpFile = fileName + ".tmp";
        string2FileNotSafe(str, tmpFile);
        // 2、为上一个版本数据创建备份文件
        String bakFile = fileName + ".bak";
        String prevContent = file2String(fileName);
        if (prevContent != null) {
            string2FileNotSafe(prevContent, bakFile);
        }
        // 3、删除上一个版本文件
        File file = new File(fileName);
        file.delete();
        // 4、重命名tmp临时文件为正式文件
        file = new File(tmpFile);
        file.renameTo(new File(fileName));
}

RouteInfoManager

RouteInfoManager 主要用来保存路由信息。这个到后面对路由管理解读时再详细看~

BrokerHousekeepingService

BrokerHousekeepingService是 netty 的监听器,处理异步事件的调用逻辑。

public class BrokerHousekeepingService implements ChannelEventListener {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final NamesrvController namesrvController;

    public BrokerHousekeepingService(NamesrvController namesrvController) {
        this.namesrvController = namesrvController;
    }
		/** 通道连接监听 **/
    @Override
    public void onChannelConnect(String remoteAddr, Channel channel) {
    }
	  /** 通道关闭监听 **/
    @Override
    public void onChannelClose(String remoteAddr, Channel channel) {
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }
    /** 通道异常监听 **/
    @Override
    public void onChannelException(String remoteAddr, Channel channel) {
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }
		/** 通道空闲监听 **/
    @Override
    public void onChannelIdle(String remoteAddr, Channel channel) {
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }
}

Configuration

Configuration 是对 namesrvConfignettyServerConfig 统一管理配置并持久化保存,持久化路径即是 configStorePath

5、启动 NameServer 核心控制器

我们主要看 NamesrvStartup.start 方法:

public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }
        // 调用 controller.initialize() 方法进行初始化
        boolean initResult = controller.initialize();
        if (!initResult) {
            // 初始化失败,结束
            controller.shutdown();
            System.exit(-3);
        }
				// 在进程关闭时,调用 controller.shutdown() 方法
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                // 
                controller.shutdown();
                return null;
            }
        }));
				// 调用 controller.start() 方法
        controller.start();

        return controller;
    }

根据启动属性创建 NamesrvController 实例后,通过 controller.initialize() 方法初始化实例。

public boolean initialize() {
        // 加载 KV 配置
        this.kvConfigManager.load();
        // 根据 nettyServerConfig 配置和 brokerHousekeepingService事件监听创建 NettyServer 网络处理对象
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        // 初始化负责 Netty 网络处理的线程池
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
				// 根据 remotingServer 对象和 remotingExecutor 线程池注册处理器
        this.registerProcessor();
        // 开启定时任务,NameServer 每隔 10 秒扫秒一次 Broker,移除处于未存活状态的 Broker(即 RocketMQ 心跳监测机制)
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        // 开启定时任务,NameServer 每隔 10 分钟打印一次 KV 配置
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // 注册一个监听器以重新加载 SslContext
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }
                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        }
                    });
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }
        return true;
    }

最后注册 JVM 钩子函数并启动服务器,以便监听 Broker、Producer 的网络请求。

Runtime.getRuntime().addShutdownHook() 方法的作用

主要是代码中使用了线程池,注册 JVM 钩子函数,当 JVM 关闭的时候,会执行系统中已经设置的所有通过方法 addShutdownHook 添加的钩子,当系统执行完这些钩子后,JVM 才会关闭。所以这些钩子可以在 JVM 关闭的时候进行对象销毁、内存清理等操作,及时释放资源。

NameServer 路由管理

NameServer 主要作用是为消息生产者和消息消费者提供关于主题 Topic 的路由信息,那么 NameServer 需要存储路由的基础信息,还要能够管理 Broker 节点,包括路由注册、路由删除等功能。

1、路由元信息

在创建时 NamesrvController 实例时会创建一个 org.apache.rocketmq.namesrv.routeinfo.RoutelnfoManager对象,我们详细的看下它有哪些信息:

public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    // Topic 消息队列路由信息,消息发送时根据路由表进行负载均衡
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    // Broker 基础信息,包含 brokerName、所属集群名称、主备 Broker 地址
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    // Broker 集群信息,存储集群中所有 Broker 名称
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    // Broker 状态信息。NameServer 每次收到心跳包时会替换该信息
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    // Broker上的 FilterServer 列表,用于类模式消息过滤
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    public RouteInfoManager() {
        this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
        this.brokerAddrTable = new HashMap<String, BrokerData>(128);
        this.clusterAddrTable = new HashMap<String, Set<String>>(32);
        this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
        this.filterServerTable = new HashMap<String, List<String>>(256);
    }
    ...
}

RouteInfoManager 主要定义了5个 HashMap 用于存储信息:

  • topicQueueTable:Topic 消息队列路由信息,消息发送时根据路由表进行负载均衡。
  • brokerAddrTable:Broker 基础信息,包含 brokerName、所属集群名称、主备 Broker 地址。
  • clusterAddrTable:Broker 集群信息,存储集群中所有 Broker 名称。
  • brokerLiveTable:Broker 状态信息。NameServer 每次收到心跳包时会替换该信息。
  • filterServerTable:Broker上的 FilterServer 列表,用于类模式消息过滤。

看下 QueueDataBrokerDataBrokerLiveInfo具体有哪些信息:

QueueData

public class QueueData implements Comparable<QueueData> {
    /**
     * broker 名称
     */
    private String brokerName;
    /**
     * 读队列数量
     */
    private int readQueueNums;
    /**
     * 写队列数量
     */
    private int writeQueueNums;
    /**
     * 读写权限,具体含义参考 PermName 1-继承 2-可写 4-可读 6-可读写(默认)7-可读写继承
     */
    private int perm;
    /**
     * topic 同步标志,具体含义参考
     */
    private int topicSysFlag;
    ...
    
}

BrokerData

public class BrokerData implements Comparable<BrokerData> {
     /**
     * 集群名称
     */
    private String cluster;
    /**
     * broker 名称
     */
    private String brokerName;
    /**
     * 同一个brokerName下可以有一个Master和多个Slave,所以brokerAddrs是一个集合 
     * brokerld=0表示 Master,大于0表示 Slave
     */
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
    ...
}    

BrokerLiveInfo

class BrokerLiveInfo {
   /**
     * 上次收到 Broker 心跳包的时间
     */
    private long lastUpdateTimestamp;
    /**
     *  版本号 
     */
    private DataVersion dataVersion;
    /**
     * Netty 的 Channel 连接
     */
    private Channel channel;
    /**
     *   HA Broker的地址
     *   是Slave从Master拉取数据时链接的地址,由brokerIp2+HA端口构成
     */
    private String haServerAddr;
    ...
}

2、路由注册

RocketMQ 的路由注册是通过 Broker 与 NameServer 的心跳机制实现的。Broker 在启动时会向集群内所有的 NameServer 发送心跳,并且默认每隔 30s 发送心跳包。在 org.apache.rocketmq.broker.BrokerController.start()中:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
  // brokerConfig.getRegisterNameServerPeriod() 默认为30s,
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

registerBrokerAll 方法是被 synchronized 修饰,所以同一时间只会有一个线程执行:

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
        // 如果 Broker 不是只写权限或者只读权限,就把 topConfig 进行更新
        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
            ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
            for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                TopicConfig tmp =
                    new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                        this.brokerConfig.getBrokerPermission());
                topicConfigTable.put(topicConfig.getTopicName(), tmp);
            }
            topicConfigWrapper.setTopicConfigTable(topicConfigTable);
        }
        // 比较关键的地方。先判断是否需要注册,然后调用 doRegisterBrokerAll 方法去注册。
        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.brokerConfig.getRegisterBrokerTimeoutMills())) {
            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
        }
    }

doRegisterBrokerAll 做了哪些:

private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
        TopicConfigSerializeWrapper topicConfigWrapper) {
        // 通过 brokerOuterAPI 向所有 NameServer 注册,并返回结果,所以这里返回的是 List
        List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
            this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.getHAServerAddr(),
            topicConfigWrapper,
            this.filterServerManager.buildNewFilterServerList(),
            oneway,
            this.brokerConfig.getRegisterBrokerTimeoutMills(),
            this.brokerConfig.isCompressedRegister());
        // 如果结果大于0,说明注册成功
        if (registerBrokerResultList.size() > 0) {
            RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
            if (registerBrokerResult != null) {
                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                    // 更新master地址本地缓存
                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                }
                // 同步设置slave的master地址
                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

                if (checkOrderConfig) {
                    // 更新 topic 顺序消息的配置
                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                }
            }
        }
    }

我们主要看 brokerOuterAPI.registerBrokerAll,该方法主要是遍历 NameServer 列表, Broker消息服务器依次向 NameServer 注册,即发送心跳包:

public List<RegisterBrokerResult> registerBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final boolean oneway,
        final int timeoutMills,
        final boolean compressed) {
        final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
           // 封装请求头
            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            // broker 地址
            requestHeader.setBrokerAddr(brokerAddr);
            // broker Id:0 代表 Master,大于0 代表 Slave
            requestHeader.setBrokerId(brokerId);
            // broker 名称
            requestHeader.setBrokerName(brokerName);
            // 集群名称
            requestHeader.setClusterName(clusterName);
            // master 地址,第一次请求时该值为空,slave 向 NameServer 注册后返回
            requestHeader.setHaServerAddr(haServerAddr);
            requestHeader.setCompressed(compressed);
            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            // 主题配置
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            // 消息过滤服务器列表
            requestBody.setFilterServerList(filterServerList);
            final byte[] body = requestBody.encode(compressed);
            final int bodyCrc32 = UtilAll.crc32(body);
            requestHeader.setBodyCrc32(bodyCrc32);
            // 使用 CountDownLatch 做计数器,当所有线程都执行完成后再返回结果
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            // 遍历 NameServer 列表, Broker消息服务器依次向 NameServer 注册,即发送心跳包
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                            if (result != null) {
                                registerBrokerResultList.add(result);
                            }
                            log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                        } catch (Exception e) {
                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            }
            try {
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }
        return registerBrokerResultList;
    }

而实际向 NameServer 网络请求的是 registerBroker:

private RegisterBrokerResult registerBroker(
        final String namesrvAddr,
        final boolean oneway,
        final int timeoutMills,
        final RegisterBrokerRequestHeader requestHeader,
        final byte[] body
    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
        InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
        request.setBody(body);
        // 单路注册,无返回值
        if (oneway) {
            try {
                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
            } catch (RemotingTooMuchRequestException e) {
                // Ignore
            }
            return null;
        }
        // 同步注册,有返回值
        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                RegisterBrokerResponseHeader responseHeader =
                    (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                RegisterBrokerResult result = new RegisterBrokerResult();
                result.setMasterAddr(responseHeader.getMasterAddr());
                result.setHaServerAddr(responseHeader.getHaServerAddr());
                if (response.getBody() != null) {
                    result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
                }
                return result;
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}

以上就是 Broker 发送心跳向 NameServer 注册的过程,下面我们来看 NameServer 是如何处理心跳请求的:

public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
                // 加一个写锁,防止并发修改 RoutelnfoManager 中的路由表信息
                this.lock.writeLock().lockInterruptibly();
                // 判断 Broker 所属集群是否存在,如果不存在,则创建
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                // 把 broker 名称加入到 Broker 集群中
                brokerNames.add(brokerName);
                boolean registerFirst = false;
                // 根据 broker 名称从 brokerAddrTable 获取 BrokerData
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null == brokerData) {
                    // 如果 BrokerData 为空,说明是第一次注册,设置 registerFirst 为 true,并创建一个 BrokerData,放入 brokerAddrTable
                    registerFirst = true;
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                // 同一个 IP:PORT 在 brokerAddrTable 中只能有一条记录,所以存在相同的地址则把本次请求的 brokerId 和 brokerAddr 替换原先的 brokerData
                Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<Long, String> item = it.next();
                    if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                        it.remove();
                    }
                }
                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                registerFirst = registerFirst || (null == oldAddr);
                // 对 Broker 为 Master 的 topic 配置进行创建或者更新,维护 topicQueueData
                if (null != topicConfigWrapper
                    && MixAll.MASTER_ID == brokerId) {
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                        || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }
                // 更新 BrokerLiveInfo,存活 Broker 信息表,BrokeLiveInfo 是执行路由删除的重要依据
                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                    new BrokerLiveInfo(
                        System.currentTimeMillis(),
                        topicConfigWrapper.getDataVersion(),
                        channel,
                        haServerAddr));
                if (null == prevBrokerLiveInfo) {
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                }
                // 注册 Broker 的过滤器 Server 地址列表 ,一个 Broker上会关联多个 FilterServer 消息过滤服务器,
                if (filterServerList != null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else {
                        this.filterServerTable.put(brokerAddr, filterServerList);
                    }
                }
                  // 如果此 Broker 为 Master 节点,则需要查找该 Broker 的 Master 的节点信息,并更新对应的 masterAddr 属性
                if (MixAll.MASTER_ID != brokerId) {
                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (masterAddr != null) {
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                        if (brokerLiveInfo != null) {
                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                            result.setMasterAddr(masterAddr);
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }
        return result;
}

3、路由剔除

RocketMQ 会在两种请求触发对路由的剔除:

  • NameServer 会定时扫描 brokerLiveTable,检测上一次心跳时间与当前系统时间的时间差,如果时间差大于120s,则会移除该 Broker 信息。
  • Broker 在正常关闭的情况下,会执行 unregisterBroker指令。

我们先看第一种情况,在 NameServer 实例初始化时,会开启一个定时任务:

 // 开启定时任务,NameServer 每隔 10 秒扫码一次 Broker,移除处于未存活状态的 Broker(即 RocketMQ 心跳监测机制)
 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 						@Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }	           
}, 5, 10, TimeUnit.SECONDS);       

那么主要看 RouteInfoManager.scanNotActiveBroker()方法:

public void scanNotActiveBroker() {
        // 遍历 brokerLiveTable
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            // 判断 BrokerLiveInfo 中的 lastUpdateTimestamp 与当前系统时间的时间差
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
               // 如果大于120s 则通过 RemotingUtil.closeChannel(channel) 关闭网络连接,并移除
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                // 触发 onChannelDestroy 事件
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
}

onChannelDestroy 主要通过加写锁对 brokerLiveTablefilterServerTablebrokerAddrTableclusterAddrTabletopicQueueTable 中涉及当前 Broker 的信息进行移除:

 public void onChannelDestroy(String remoteAddr, Channel channel) {
        String brokerAddrFound = null;
        if (channel != null) {
            try {
                try {
                    // 加读锁,多个线程可以并发读取
                    this.lock.readLock().lockInterruptibly();
                    Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                        this.brokerLiveTable.entrySet().iterator();
                    while (itBrokerLiveTable.hasNext()) {
                        Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
                        if (entry.getValue().getChannel() == channel) {
                            brokerAddrFound = entry.getKey();
                            break;
                        }
                    }
                } finally {
                    this.lock.readLock().unlock();
                }
            } catch (Exception e) {
                log.error("onChannelDestroy Exception", e);
            }
        }

        if (null == brokerAddrFound) {
            brokerAddrFound = remoteAddr;
        } else {
            log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
        }

        if (brokerAddrFound != null && brokerAddrFound.length() > 0) {

            try {
                try {
                    // 申请写锁
                    this.lock.writeLock().lockInterruptibly();
                    // 维护 brokerLiveTable, 根据 brokerAddress 从 brokerLiveTable 中移除 Broker 信息
                    this.brokerLiveTable.remove(brokerAddrFound);
                    // 维护 filterServerTable, 根据 brokerAddress 从 filterServerTable 中移除 Broker 信息
                    this.filterServerTable.remove(brokerAddrFound);
                    // 维护 brokerAddrTable
                    String brokerNameFound = null;
                    boolean removeBrokerName = false;
                    Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                        this.brokerAddrTable.entrySet().iterator();
                    while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                        BrokerData brokerData = itBrokerAddrTable.next().getValue();
                        Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
                        while (it.hasNext()) {
                            Entry<Long, String> entry = it.next();
                            Long brokerId = entry.getKey();
                            String brokerAddr = entry.getValue();
                            // 从 BrokerData 的 HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs 中
                            // 找到具体的 Broker,从 BrokerData 中移除
                            if (brokerAddr.equals(brokerAddrFound)) {
                                brokerNameFound = brokerData.getBrokerName();
                                it.remove();
                                log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                                    brokerId, brokerAddr);
                                break;
                            }
                        }
                        // 如果移除后在 BrokerData 中不再包含其他 Broker,则在 brokerAddrTable 中移除该 brokerName 对应的条目
                        if (brokerData.getBrokerAddrs().isEmpty()) {
                            removeBrokerName = true;
                            itBrokerAddrTable.remove();
                            log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                                brokerData.getBrokerName());
                        }
                    }
                    // 维护 clusterAddrTable
                    if (brokerNameFound != null && removeBrokerName) {
                        Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
                        while (it.hasNext()) {
                            Entry<String, Set<String>> entry = it.next();
                            String clusterName = entry.getKey();
                            Set<String> brokerNames = entry.getValue();
                            // 根据 BrokerName,从 clusterAddrTable 中找到 Broker 并从集群中移除
                            boolean removed = brokerNames.remove(brokerNameFound);
                            if (removed) {
                                log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                                    brokerNameFound, clusterName);
                                // 如果移除后集群中不再包含任何 Broker,则将该集群从 clusterAddrTable 中移除
                                if (brokerNames.isEmpty()) {
                                    log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                                        clusterName);
                                    it.remove();
                                }

                                break;
                            }
                        }
                    }
                    // 维护 topicQueueTable
                    if (removeBrokerName) {
                        // 遍历 topicQueueTable
                        Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                            this.topicQueueTable.entrySet().iterator();
                        while (itTopicQueueTable.hasNext()) {
                            Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
                            String topic = entry.getKey();
                            List<QueueData> queueDataList = entry.getValue();

                            Iterator<QueueData> itQueueData = queueDataList.iterator();
                            while (itQueueData.hasNext()) {
                                QueueData queueData = itQueueData.next();
                                // 如果 QueueData 中包含当前 Broker 的队列,则移除
                                if (queueData.getBrokerName().equals(brokerNameFound)) {
                                    itQueueData.remove();
                                    log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                                        topic, queueData);
                                }
                            }
                            // 如果移除后 List<QueueData> 队列为空,则从 topicQueueTable 中移除该队列
                            if (queueDataList.isEmpty()) {
                                itTopicQueueTable.remove();
                                log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                                    topic);
                            }
                        }
                    }
                } finally {
                    // 释放锁,完成路由删除
                    this.lock.writeLock().unlock();
                }
            } catch (Exception e) {
                log.error("onChannelDestroy Exception", e);
            }
        }
    }

Broker 在正常关闭的情况下,会执行 unregisterBroker 指令:

public void unregisterBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId) {
        try {
            try {
                // 加写锁
                this.lock.writeLock().lockInterruptibly();
                // 维护 brokerLiveTable,根据 brokerAddress 移除对应信息
                BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
                log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
                    brokerLiveInfo != null ? "OK" : "Failed",
                    brokerAddr
                );
                // 维护 filterServerTable,根据 brokerAddress 移除对应信息
                this.filterServerTable.remove(brokerAddr);

                // 维护 brokerAddrTable,根据 brokerName 获取 BrokerData
                boolean removeBrokerName = false;
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null != brokerData) {
                    // 在 BrokerData 的 HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs 中,根据 brokerId 移除
                    String addr = brokerData.getBrokerAddrs().remove(brokerId);
                    log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
                        addr != null ? "OK" : "Failed",
                        brokerAddr
                    );
                    // 如果移除后,如果 brokerAddrs 为空,则移除该 brokerData
                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        this.brokerAddrTable.remove(brokerName);
                        log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
                            brokerName
                        );
                        removeBrokerName = true;
                    }
                }
                // 维护 clusterAddrTable,根据 clusterName 获取 BrokerName 集合
                if (removeBrokerName) {
                    Set<String> nameSet = this.clusterAddrTable.get(clusterName);
                    if (nameSet != null) {
                        // 移除 brokerName
                        boolean removed = nameSet.remove(brokerName);
                        log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
                            removed ? "OK" : "Failed",
                            brokerName);
                        // 如果移除后 nameSet 为空,则根据 clusterName 移除整个 nameSet
                        if (nameSet.isEmpty()) {
                            this.clusterAddrTable.remove(clusterName);
                            log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
                                clusterName
                            );
                        }
                    }
                    // 维护 topicQueueTable,根据 brokerName 移除 List<QueueData>
                    this.removeTopicByBrokerName(brokerName);
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("unregisterBroker Exception", e);
        }
}

4、路由发现

RocketMQ 路由发现是非实时的,当 Topic 路由发生变化时,NameServer 不主动推送给客户端,而是客户端定时拉取 Topic 最新的路由信息。客户端通过 Topic 名称向 NameServer 发送 GET_ROUTEINFO_BY_TOPIC 命令的请求拉取最新路由信息。NameServer 所有的网络请求处理都在 org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor 下,这里主要看 getRouteInfoByTopic 方法:

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final GetRouteInfoRequestHeader requestHeader =
            (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
				// 调用 RouterlnfoManager 的方法,从路由表 topicQueueTable、 brokerAddrTable、 filterServerTable 中分别填充 TopicRouteData 中的List<QueueData>、List<BrokerData>和 filterServer 地址表
        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

        if (topicRouteData != null) {
          // 如果找到主题对应的路由信息并且该主题为顺序消息,则从 NameServer KVconfig 中获取关于顺序消息相关的配置填充路由信息
            if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
                String orderTopicConf =
                    this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                        requestHeader.getTopic());
                topicRouteData.setOrderTopicConf(orderTopicConf);
            }

            byte[] content = topicRouteData.encode();
            response.setBody(content);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }
				// 如果找不到路由信息 CODE 则使用 TOPIC_NOT_EXISTS,表示没有找到对应的路由
        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
            + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
        return response;
    }

总结

NameServer 路由发现和心跳机制
1

评论区