侧边栏壁纸
博主头像
码森林博主等级

一起走进码森林,享受编程的乐趣,发现科技的魅力,创造智能的未来!

  • 累计撰写 146 篇文章
  • 累计创建 74 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

docker-compose 部署 RocketMQ

码森林
2022-02-17 / 0 评论 / 1 点赞 / 1,246 阅读 / 13,080 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-02-17,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

本文记录如何构建 RocketMQ 的 namesrv 和 broker 镜像,并采用阿里云镜像中心进行管理,最后使用 docker-compose 编排单点部署,主要用于学习测试。

① 源码构建 RocketMQ 镜像

下载官方源码

可以根据需求设置版本号,本文采用 version 4.9.2。

https://dist.apache.org/repos/dist/release/rocketmq/${ROCKETMQ_VERSION}/rocketmq-all-${ROCKETMQ_VERSION}-bin-release.zip

文件目录

image-20220217161708518

创建 Dockerfile 文件

FROM java:8

MAINTAINER "zwqh.top <zwqh1024@163.com>"

ARG version

# Rocketmq 版本号
ENV ROCKETMQ_VERSION ${version}

# Rocketmq 主目录
ENV ROCKETMQ_HOME /opt/rocketmq

WORKDIR  ${ROCKETMQ_HOME}

# 下载 Rocketmq, 由于下载源码比较慢,会导致构建镜像失败,先下载到本地再构建
#RUN curl https://dist.apache.org/repos/dist/release/rocketmq/${ROCKETMQ_VERSION}/rocketmq-all-${ROCKETMQ_VERSION}-bin-release.zip -o rocketmq.zip \
# && unzip rocketmq.zip \
# && mv rocketmq-all*/* . \
# && rmdir rocketmq-all*  \
# && rm rocketmq.zip

# 添加脚本
COPY rocketmq-${ROCKETMQ_VERSION} ${ROCKETMQ_HOME}
COPY scripts/ ${ROCKETMQ_HOME}/bin/

VOLUME ${ROCKETMQ_HOME}/conf

# 暴露 namesrv 端口
EXPOSE 9876

# 为 mqnamesrv 添加自定义脚本
RUN mv ${ROCKETMQ_HOME}/bin/mqnamesrv-customize ${ROCKETMQ_HOME}/bin/mqnamesrv \
 && chmod +x ${ROCKETMQ_HOME}/bin/mqnamesrv

# 为 runserver.sh 添加自定义脚本
RUN mv ${ROCKETMQ_HOME}/bin/runserver-customize.sh ${ROCKETMQ_HOME}/bin/runserver.sh \
 && chmod +x ${ROCKETMQ_HOME}/bin/runserver.sh

# 暴露代理端口
EXPOSE 10909 10911

# 为 mq 代理添加自定义脚本
RUN mv ${ROCKETMQ_HOME}/bin/mqbroker-customize ${ROCKETMQ_HOME}/bin/mqbroker \
 && chmod +x ${ROCKETMQ_HOME}/bin/mqbroker

# 为 runuroker.sh 添加自定义脚本
RUN mv ${ROCKETMQ_HOME}/bin/runbroker-customize.sh ${ROCKETMQ_HOME}/bin/runbroker.sh \
 && chmod +x ${ROCKETMQ_HOME}/bin/runbroker.sh

# 导出 Java 选项
RUN export JAVA_OPT=" -Duser.home=/opt"

WORKDIR ${ROCKETMQ_HOME}/bin

脚本 mqbroker-customize

#!/bin/bash
#
# Program: 本程序自定义RocketMQ mqbroker,用户可以通过“--jvm”添加jvm参数
#

if [ -z "$ROCKETMQ_HOME" ] ; then
  # resolve links - $0 may be a link to maven's home
  PRG="$0"

  # need this for relative symlinks
  while [ -h "$PRG" ] ; do
    ls=`ls -ld "$PRG"`
    link=`expr "$ls" : '.*-> \(.*\)$'`
    if expr "$link" : '/.*' > /dev/null; then
      PRG="$link"
    else
      PRG="`dirname "$PRG"`/$link"
    fi
  done

  saveddir=`pwd`

  ROCKETMQ_HOME=`dirname "$PRG"`/..

  # make it fully qualified
  ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd`

  cd "$saveddir"
fi

export ROCKETMQ_HOME

if [ -z "$1" ]||[ "$1" != "--jvm" ]; then
    sh ${ROCKETMQ_HOME}/bin/runbroker.sh \
     org.apache.rocketmq.broker.BrokerStartup \
     -c /opt/rocketmq/conf/broker.conf $@
else
    JVM_KEY="$1"
    JVM_VALUE="$2"
    shift 2
    sh ${ROCKETMQ_HOME}/bin/runbroker.sh ${JVM_KEY} "${JVM_VALUE}" \
     org.apache.rocketmq.broker.BrokerStartup \
     -c /opt/rocketmq/conf/broker.conf $@
fi

脚本 mqnamesrv-customize

#!/bin/sh
#
# Program: 本程序自定义RocketMQ mqnamesrv,用户可以通过“--jvm”添加jvm参数
#

if [ -z "$ROCKETMQ_HOME" ] ; then
  # resolve links - $0 may be a link to maven's home
  PRG="$0"

  # need this for relative symlinks
  while [ -h "$PRG" ] ; do
    ls=`ls -ld "$PRG"`
    link=`expr "$ls" : '.*-> \(.*\)$'`
    if expr "$link" : '/.*' > /dev/null; then
      PRG="$link"
    else
      PRG="`dirname "$PRG"`/$link"
    fi
  done

  saveddir=`pwd`

  ROCKETMQ_HOME=`dirname "$PRG"`/..

  # make it fully qualified
  ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd`

  cd "$saveddir"
fi

export ROCKETMQ_HOME

if [ -z "$1" ]||[ "$1" != "--jvm" ]; then
    sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup $@
else
    JVM_KEY="$1"
    JVM_VALUE="$2"
    shift 2
    sh ${ROCKETMQ_HOME}/bin/runserver.sh ${JVM_KEY} "${JVM_VALUE}" org.apache.rocketmq.namesrv.NamesrvStartup $@
fi

脚本 runbroker-customize.sh

#!/bin/bash
#
# Program: 本程序自定义RocketMQ runbroker.sh,用户可以通过“--jvm”添加jvm参数
#

#===========================================================================================
# Java Environment Setting
#===========================================================================================
error_exit ()
{
    echo "ERROR: $1 !!"
    exit 1
}

[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}

#===========================================================================================
# JVM Configuration
#===========================================================================================
if [ -z "$1" ]||[ "$1" != "--jvm" ]; then
    JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
else
    JAVA_OPT="${JAVA_OPT} -server $2"
    shift 2
fi
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

numactl --interleave=all pwd > /dev/null 2>&1
if [ $? -eq 0 ]
then
        if [ -z "$RMQ_NUMA_NODE" ] ; then
                numactl --interleave=all $JAVA ${JAVA_OPT} $@
        else
                numactl --cpunodebind=$RMQ_NUMA_NODE --membind=$RMQ_NUMA_NODE $JAVA ${JAVA_OPT} $@
        fi
else
        $JAVA ${JAVA_OPT} $@
fi

脚本 runserver-customize.sh

#!/bin/bash
#
# Program: 本程序自定义RocketMQ runserver.sh,用户可以通过“--jvm”添加jvm参数
#

#===========================================================================================
# Java Environment Setting
#===========================================================================================
error_exit ()
{
    echo "ERROR: $1 !!"
    exit 1
}

[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}

#===========================================================================================
# JVM Configuration
#===========================================================================================
if [ -z "$1" ]||[ "$1" != "--jvm" ]; then
    JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
else
    JAVA_OPT="${JAVA_OPT} -server $2 -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    shift 2
fi
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8  -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/rmq_srv_gc.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT}  -XX:-UseLargePages"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
# JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

$JAVA ${JAVA_OPT} $@

构建镜像

docker build -t rocketmq:4.9.2 ${dockerfile 文件路径}

② 推送到阿里云镜像中心

# 登录
docker login --username=${username} registry.cn-hangzhou.aliyuncs.com
# 密码
${password}
# 设置 tag
docker tag cb1588a8e1ae registry.cn-hangzhou.aliyuncs.com/zwqh/rocketmq:4.9.2
# 推送
docker push registry.cn-hangzhou.aliyuncs.com/zwqh/rocketmq:4.9.2

③ docker-compose 部署 RocketMQ

在服务器创建 /opt/rocketmq 目录,在该目录下分别创建 conf、logs、store 文件夹,创建 docker-compose.yaml。

docker-compose.yaml

version: '3.5'
services:
  rmqnamesrv:
    image: registry.cn-hangzhou.aliyuncs.com/zwqh/rocketmq:4.9.2
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    volumes:
      - /opt/rocketmq/logs:/opt/rocketmq/logs
      - /opt/rocketmq/store:/opt/rocketmq/store
    environment:
      JAVA_OPT_EXT: "-server -Xms512m -Xmx512m -Xmn256m"
    command: sh mqnamesrv
    networks:
      - xunlu

  rmqbroker:
    image: registry.cn-hangzhou.aliyuncs.com/zwqh/rocketmq:4.9.2
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - /opt/rocketmq/logs:/opt/rocketmq/logs
      - /opt/rocketmq/store:/opt/rocketmq/store
      - /opt/rocketmq/conf/broker.conf:/opt/rocketmq/conf/broker.conf
    environment:
      NAMESRV_ADDR: "rmqnamesrv:9876"
      JAVA_OPT_EXT: "-server -Xms512m -Xmx512m -Xmn256m"
    command: sh mqbroker -c ../conf/broker.conf
    depends_on:
      - rmqnamesrv
    networks:
      - xunlu

  rmqconsole:
    image: apacherocketmq/rocketmq-dashboard:latest
    container_name: rmqdashboard
    ports:
      - 18082:8080
    environment:
      JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rmqnamesrv
    networks:
      - xunlu

networks:
  xunlu:
    external: true

在 conf 目录创建 broker.conf

# 所属集群名字
brokerClusterName = DefaultCluster

# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName = broker-a

# 0 表示 Master,> 0 表示 Slave
brokerId = 0

# nameServer地址,分号分割
namesrvAddr = 服务器IP:9876

# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
brokerIP1 = 服务器IP

# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
#defaultTopicQueueNums=4

# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
#autoCreateTopicEnable=true

# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
#autoCreateSubscriptionGroup=true

# Broker 对外服务的监听端口
#listenPort=10911

# 删除文件时间点,默认凌晨4点
deleteWhen = 04

# 文件保留时间,默认48小时
fileReservedTime = 48

# commitLog 每个文件的大小默认1G
#mapedFileSizeCommitLog=1073741824

# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
#mapedFileSizeConsumeQueue=300000

# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
#diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/opt/rocketmq/store
# commitLog 存储路径
storePathCommitLog=/opt/rocketmq/store/commitlog
# 消费队列存储
storePathConsumeQueue=/opt/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/opt/rocketmq/store/index
# checkpoint 文件存储路径
storeCheckpoint=/opt/rocketmq/store/checkpoint
# abort 文件存储路径
abortFile=/opt/rocketmq/store/abort
# 限制的消息大小
#maxMessageSize=65536

# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000

# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole = ASYNC_MASTER

# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType = ASYNC_FLUSH

# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128

docker-compose 部署

# 切换目录
cd /opt/rocketmq
# 启动
docker-compose.yaml up -d 
# 停止并移除
docker-compose.yaml down

④ 访问 RocketMQ-Dashboard

http://IP:18082/

image-20220217184047413

⑤ 测试用例

Producer

/**
 * 发送同步消息
 * 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
 */
public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("ip:9876");
        producer.setVipChannelEnabled(false);
        producer.setSendMsgTimeout(60000);
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 10; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

Consumer

/**
 * 消费消息
 */
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // 设置NameServer的地址
        consumer.setNamesrvAddr("8.136.139.149:9876");

        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe("TopicTest", "*");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

⑥ 学习

学习 MQ:https://www.masenlin.com/categories/mq

学习 Docker:https://www.masenlin.com/categories/%E8%BF%90%E7%BB%B4

1

评论区