Canal实现MYSQL实时数据同步

Canal实现MYSQL实时数据同步

码农世界 2024-05-31 后端 82 次浏览 0个评论

Canal实现MYSQL实时数据同步

  • 部署Canal-Admin
    • 1. 拉取Canal-Admin镜像
    • 2. 创建目录
    • 3. 创建canal_manager数据库
    • 4. 启动Canal-Admin
    • 5. 拷贝配置文件
    • 6. 删除Canal-Admin容器
    • 7. 修改配置文件
    • 8. 部署Canal-Admin
    • 9. 访问Canal-Admin
    • 部署Canal-Server
      • 0. 前置工作
      • 1. 拉取Canal-Server镜像
      • 2. 创建目录
      • 3. 启动Canal-Server
      • 4. 拷贝配置文件
      • 5. 删除Canal-Server
      • 6. 修改配置文件
      • 7. 部署Canal-Server
      • 部署Canal-Adapter
        • 1. 创建对应的文件,文件如下
        • 2. 对应的文件内容如下
        • 3. 构造 `Canal-Adapter` 镜像
        • 4. 创建目录
        • 5. 启动Canal-Adapter
        • 6. 拷贝配置文件
        • 7. 删除Canal-Adapter
        • 8. 修改配置文件
        • 9. 部署Canal-Adapter

          Canal实现MYSQL实时数据同步

          部署Canal-Admin


          1. 拉取Canal-Admin镜像

          为了兼容MYSQL8.0+, 我们需要拉取 v1.1.7的镜像

          docker pull canal/canal-admin:v1.1.7
          

          2. 创建目录

          mkdir -p /data/canal-server/conf/
          

          3. 创建canal_manager数据库

          CREATE DATABASE /*!32312 IF NOT EXISTS*/ `canal_manager` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */;
          USE `canal_manager`;
          SET NAMES utf8;
          SET FOREIGN_KEY_CHECKS = 0;
          -- ----------------------------
          -- Table structure for canal_adapter_config
          -- ----------------------------
          DROP TABLE IF EXISTS `canal_adapter_config`;
          CREATE TABLE `canal_adapter_config` (
            `id` bigint(20) NOT NULL AUTO_INCREMENT,
            `category` varchar(45) NOT NULL,
            `name` varchar(45) NOT NULL,
            `status` varchar(45) DEFAULT NULL,
            `content` text NOT NULL,
            `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
            PRIMARY KEY (`id`)
          ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
          -- ----------------------------
          -- Table structure for canal_cluster
          -- ----------------------------
          DROP TABLE IF EXISTS `canal_cluster`;
          CREATE TABLE `canal_cluster` (
            `id` bigint(20) NOT NULL AUTO_INCREMENT,
            `name` varchar(63) NOT NULL,
            `zk_hosts` varchar(255) NOT NULL,
            `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
            PRIMARY KEY (`id`)
          ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
          -- ----------------------------
          -- Table structure for canal_config
          -- ----------------------------
          DROP TABLE IF EXISTS `canal_config`;
          CREATE TABLE `canal_config` (
            `id` bigint(20) NOT NULL AUTO_INCREMENT,
            `cluster_id` bigint(20) DEFAULT NULL,
            `server_id` bigint(20) DEFAULT NULL,
            `name` varchar(45) NOT NULL,
            `status` varchar(45) DEFAULT NULL,
            `content` text NOT NULL,
            `content_md5` varchar(128) NOT NULL,
            `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
            PRIMARY KEY (`id`),
            UNIQUE KEY `sid_UNIQUE` (`server_id`)
          ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
          -- ----------------------------
          -- Table structure for canal_instance_config
          -- ----------------------------
          DROP TABLE IF EXISTS `canal_instance_config`;
          CREATE TABLE `canal_instance_config` (
            `id` bigint(20) NOT NULL AUTO_INCREMENT,
            `cluster_id` bigint(20) DEFAULT NULL,
            `server_id` bigint(20) DEFAULT NULL,
            `name` varchar(45) NOT NULL,
            `status` varchar(45) DEFAULT NULL,
            `content` text NOT NULL,
            `content_md5` varchar(128) DEFAULT NULL,
            `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
            PRIMARY KEY (`id`),
            UNIQUE KEY `name_UNIQUE` (`name`)
          ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
          -- ----------------------------
          -- Table structure for canal_node_server
          -- ----------------------------
          DROP TABLE IF EXISTS `canal_node_server`;
          CREATE TABLE `canal_node_server` (
            `id` bigint(20) NOT NULL AUTO_INCREMENT,
            `cluster_id` bigint(20) DEFAULT NULL,
            `name` varchar(63) NOT NULL,
            `ip` varchar(63) NOT NULL,
            `admin_port` int(11) DEFAULT NULL,
            `tcp_port` int(11) DEFAULT NULL,
            `metric_port` int(11) DEFAULT NULL,
            `status` varchar(45) DEFAULT NULL,
            `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
            PRIMARY KEY (`id`)
          ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
          -- ----------------------------
          -- Table structure for canal_user
          -- ----------------------------
          DROP TABLE IF EXISTS `canal_user`;
          CREATE TABLE `canal_user` (
            `id` bigint(20) NOT NULL AUTO_INCREMENT,
            `username` varchar(31) NOT NULL,
            `password` varchar(128) NOT NULL,
            `name` varchar(31) NOT NULL,
            `roles` varchar(31) NOT NULL,
            `introduction` varchar(255) DEFAULT NULL,
            `avatar` varchar(255) DEFAULT NULL,
            `creation_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
            PRIMARY KEY (`id`)
          ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
          SET FOREIGN_KEY_CHECKS = 1;
          -- ----------------------------
          -- Records of canal_user
          -- ----------------------------
          BEGIN;
          INSERT INTO `canal_user` VALUES (1, 'admin', '6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9', 'Canal Manager', 'admin', NULL, NULL, '2019-07-14 00:05:28');
          COMMIT;
          SET FOREIGN_KEY_CHECKS = 1;
          

          4. 启动Canal-Admin

          如果是服务器部署的话,记得开放对应的端口号 8089:8089

          docker run -d --name canal-admin -p 8089:8089 canal/canal-admin:v1.1.7
          

          5. 拷贝配置文件

          docker cp canal-admin:/home/admin/canal-admin/conf/application.yml /data/canal-admin/conf/
          

          6. 删除Canal-Admin容器

          docker rm -f canal-admin
          

          7. 修改配置文件

          (注意修改注释位置的信息) 一共3处修改

          server:
            port: 8089
          spring:
            jackson:
              date-format: yyyy-MM-dd HH:mm:ss
              time-zone: GMT+8
          spring.datasource:
            address: xxx.xxx.xxx.xxx:3306   	#创建数据库canal_manager的地址及端口号
            database: canal_manager			 
            username: root					#数据库账号
            password: xxxxxx					#数据库密码
            driver-class-name: com.mysql.jdbc.Driver
            url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
            hikari:
              maximum-pool-size: 30
              minimum-idle: 1
          canal:
            adminUser: admin
            adminPasswd: admin
          

          8. 部署Canal-Admin

          docker run --name canal-admin -p 8089:8089 \
          -v /data/canal-admin/conf/application.yml:/home/admin/canal-admin/conf/application.yml \
          -v /data/canal-admin/logs/:/home/admin/canal-admin/logs/ \
          -d canal/canal-admin:v1.1.7
          

          9. 访问Canal-Admin

          访问 xxx.xxx.xxx.xxx:8089, 前面换成自己的服务器、虚拟机地址

          部署Canal-Server


          0. 前置工作

          查看MYSQL是否 开启log_bin日志 和 日志记录格式 是否为Row

          一般都是开启的, 可以通过下列命令在 Navicat 或其他数据库工具进行查看

          show variables like 'log_bin';
          show variables like 'binlog_format';
          

          授权账号权限, 复制下列命令执行即可

          CREATE USER canal IDENTIFIED BY 'canal';  
          GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
          FLUSH PRIVILEGES;
          

          1. 拉取Canal-Server镜像

          docker pull canal/canal-server:v1.1.7
          

          2. 创建目录

          mkdir -p /data/canal-server/conf/
          

          3. 启动Canal-Server

          如果是服务器部署的话,记得开放对应的端口号 11111:11111

          docker run -d --name canal-server -p 11111:11111 canal/canal-server:v1.1.7
          

          4. 拷贝配置文件

          docker cp canal-server:/home/admin/canal-server/conf/canal.properties /data/canal-server/conf/
          docker cp canal-server:/home/admin/canal-server/conf/example/instance.properties /data/canal-server/conf/
          

          5. 删除Canal-Server

          docker rm -f canal-server
          

          6. 修改配置文件

          canal.properties 一共1处修改

          # tcp bind ip
          canal.ip =
          # register ip to zookeeper
          canal.register.ip =
          canal.port = 11111
          canal.metrics.pull.port = 11112
          # canal admin config
          canal.admin.manager = xxx.xxx.xxx.xxx:8089  # 改成自己的数据库地址
          canal.admin.port = 11110
          canal.admin.user = admin
          canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
          canal.zkServers =
          # flush data to zk
          canal.zookeeper.flush.period = 1000
          canal.withoutNetty = false
          # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
          canal.serverMode = tcp
          # flush meta cursor/parse position to file
          canal.file.data.dir = ${canal.conf.dir}
          canal.file.flush.period = 1000
          ## memory store RingBuffer size, should be Math.pow(2,n)
          canal.instance.memory.buffer.size = 16384
          ## memory store RingBuffer used memory unit size , default 1kb
          canal.instance.memory.buffer.memunit = 1024 
          ## meory store gets mode used MEMSIZE or ITEMSIZE
          canal.instance.memory.batch.mode = MEMSIZE
          canal.instance.memory.rawEntry = true
          ## detecing config
          canal.instance.detecting.enable = false
          canal.instance.detecting.sql = select 1
          canal.instance.detecting.interval.time = 3
          canal.instance.detecting.retry.threshold = 3
          canal.instance.detecting.heartbeatHaEnable = false
          canal.instance.transaction.size =  1024
          # mysql fallback connected to new master should fallback times
          canal.instance.fallbackIntervalInSeconds = 60
          # network config
          canal.instance.network.receiveBufferSize = 16384
          canal.instance.network.sendBufferSize = 16384
          canal.instance.network.soTimeout = 30
          # binlog filter config
          canal.instance.filter.druid.ddl = true
          canal.instance.filter.query.dcl = false
          canal.instance.filter.query.dml = false
          canal.instance.filter.query.ddl = false
          canal.instance.filter.table.error = false
          canal.instance.filter.rows = false
          canal.instance.filter.transaction.entry = false
          canal.instance.filter.dml.insert = false
          canal.instance.filter.dml.update = false
          canal.instance.filter.dml.delete = false
          # binlog format/image check
          canal.instance.binlog.format = ROW,STATEMENT,MIXED 
          canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
          # binlog ddl isolation
          canal.instance.get.ddl.isolation = false
          # parallel parser config
          canal.instance.parser.parallel = true
          canal.instance.parser.parallelBufferSize = 256
          # table meta tsdb info
          canal.instance.tsdb.enable = true
          canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
          canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
          canal.instance.tsdb.dbUsername = canal
          canal.instance.tsdb.dbPassword = canal
          # dump snapshot interval, default 24 hour
          canal.instance.tsdb.snapshot.interval = 24
          # purge snapshot expire , default 360 hour(15 days)
          canal.instance.tsdb.snapshot.expire = 360
          #################################################
          ######### 		destinations		#############
          #################################################
          canal.destinations = example
          # conf root dir
          canal.conf.dir = ../conf
          # auto scan instance dir add/remove and start/stop instance
          canal.auto.scan = true
          canal.auto.scan.interval = 5
          canal.auto.reset.latest.pos.mode = false
          canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
          canal.instance.global.mode = spring
          canal.instance.global.lazy = false
          canal.instance.global.manager.address = ${canal.admin.manager}
          canal.instance.global.spring.xml = classpath:spring/file-instance.xml
          # canal admin config
          canal.admin.manager = xxx.xxx.xxx.xxx:8089 # 改成自己的数据库地址
          # admin auto register
          canal.admin.register.auto = true
          canal.admin.register.cluster =
          canal.admin.register.name =
          

          instance.properties 一共2处修改

          #################################################
          ## mysql serverId , v1.0.26+ will autoGen
           canal.instance.mysql.slaveId=10 # 这里的ID不要和Mysql的重复即可,可以直接填10
          # enable gtid use true/false
          canal.instance.gtidon=false
          # position info
          canal.instance.master.address=xxx.xxx.xxx.xxx:3306 # 改成需要监听的数据库地址
          canal.instance.master.journal.name=
          canal.instance.master.position=
          canal.instance.master.timestamp=
          canal.instance.master.gtid=
          # rds oss binlog
          canal.instance.rds.accesskey=
          canal.instance.rds.secretkey=
          canal.instance.rds.instanceId=
          # table meta tsdb info
          canal.instance.tsdb.enable=true
          # username/password
          canal.instance.dbUsername=canal
          canal.instance.dbPassword=canal
          canal.instance.connectionCharset = UTF-8
          canal.instance.enableDruid=false
          # table regex
          canal.instance.filter.regex=.*\\..*
          # table black regex
          canal.instance.filter.black.regex=mysql\\.slave_.*
          # mq config
          canal.mq.topic=example
          canal.mq.partition=0
          canal.instance.multi.stream.on=false
          #################################################
          

          7. 部署Canal-Server

          docker run --name canal-server -p 11111:11111 \
          -v /data/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
          -v /data/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties \
          -v /data/canal-server/logs/:/home/docker/canal-server/logs/ \
          -d canal/canal-server:v1.1.7
          

          部署Canal-Adapter


          由于这个 Canal-Adapter 作者并没有上传对应的镜像

          但是他上传了对应的 tar ,所以我们可以制作对应的镜像

          Canal-Adapter 的下载链接: Canal-Adapter

          Canal实现MYSQL实时数据同步

          1. 创建对应的文件,文件如下

          路径尽量跟我的一致 /opt/canal/ ,因为后面的文件都是在这个路径下

          Canal实现MYSQL实时数据同步

          2. 对应的文件内容如下

          Dockerfile

          FROM openjdk:11
          COPY canal.adapter-*.tar.gz /tmp/
          	
          RUN \
              mkdir -p /opt/canal-adapter && \
              tar -zxf /tmp/canal.adapter-*.tar.gz -C /opt/canal-adapter && \
              rm -r /tmp/canal.adapter-*.tar.gz
          COPY startup.sh /opt/canal-adapter/bin/startup.sh
          WORKDIR /opt/canal-adapter
          CMD ["sh", "-c", "sh /opt/canal-adapter/bin/startup.sh && tail -F logs/adapter/adapter.log"]
          

          startup.sh

          #!/bin/bash
          current_path=`pwd`
          case "`uname`" in
              Linux)
          		bin_abs_path=$(readlink -f $(dirname $0))
          		;;
          	*)
          		bin_abs_path=`cd $(dirname $0); pwd`
          		;;
          esac
          base=${bin_abs_path}/..
          export LANG=en_US.UTF-8
          export BASE=$base
          if [ -f $base/bin/adapter.pid ] ; then
          	echo "found adapter.pid , Please run stop.sh first ,then startup.sh" 2>&2
              exit 1
          fi
          if [ ! -d $base/logs ] ; then
          	mkdir -p $base/logs
          fi
          ## set java path
          if [ -z "$JAVA" ] ; then
            JAVA=$(which java)
          fi
          ALIBABA_JAVA="/usr/alibaba/java/bin/java"
          TAOBAO_JAVA="/opt/taobao/java/bin/java"
          if [ -z "$JAVA" ]; then
            if [ -f $ALIBABA_JAVA ] ; then
            	JAVA=$ALIBABA_JAVA
            elif [ -f $TAOBAO_JAVA ] ; then
            	JAVA=$TAOBAO_JAVA
            else
            	echo "Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH." 2>&2
              exit 1
            fi
          fi
          case "$#"
          in
          0 )
            ;;
          2 )
            if [ "$1" = "debug" ]; then
              DEBUG_PORT=$2
              DEBUG_SUSPEND="n"
              JAVA_DEBUG_OPT="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=$DEBUG_PORT,server=y,suspend=$DEBUG_SUSPEND"
            fi
            ;;
          * )
            echo "THE PARAMETERS MUST BE TWO OR LESS.PLEASE CHECK AGAIN."
            exit;;
          esac
          str=`file -L $JAVA | grep 64-bit`
          if [ -n "$str" ]; then
          	JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -Xss256k -XX:+DisableExplicitGC -XX:+HeapDumpOnOutOfMemoryError"
          else
          	JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "
          fi
          JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"
          ADAPTER_OPTS="-DappName=canal-adapter"
          for i in $base/lib/*;
              do CLASSPATH=$i:"$CLASSPATH";
          done
          CLASSPATH="$base/conf:$CLASSPATH";
          echo "cd to $bin_abs_path for workaround relative path"
          cd $bin_abs_path
          echo CLASSPATH :$CLASSPATH
          exec $JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $ADAPTER_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication
          

          3. 构造 Canal-Adapter 镜像

          cd /opt/canal
          docker build -t canal/canal-adapter:v1.1.7 .
          

          4. 创建目录

          mkdir -p /data/canal-adapter/conf/es7
          

          5. 启动Canal-Adapter

          docker run -d --name canal-adapter -p 8081:8081 canal/canal-adapter:v1.1.7
          

          6. 拷贝配置文件

          docker cp canal-adapter:/opt/canal-adapter/conf/application.yml /data/canal-adapter/conf/
          docker cp canal-adapter:/opt/canal-adapter/conf/bootstrap.yml /data/canal-adapter/conf/
          docker cp canal-adapter:/opt/canal-adapter/conf/es7/mytest_user.yml /data/canal-adapter/conf/es7
          

          7. 删除Canal-Adapter

          docker rm -f canal-adapter
          

          8. 修改配置文件

          application.yml 一共7处地方

          server:
            port: 8081
          spring:
            jackson:
              date-format: yyyy-MM-dd HH:mm:ss
              time-zone: GMT+8
              default-property-inclusion: non_null
          canal.conf:
            mode: tcp #tcp kafka rocketMQ rabbitMQ
            flatMessage: true
            zookeeperHosts:
            syncBatchSize: 1000
            retries: -1
            timeout:
            accessKey:
            secretKey:
            consumerProperties:
              # canal tcp consumer
              canal.tcp.server.host: xxx.xxx.xxx.xxx:11111 #改成部署canal-server的地址
              canal.tcp.zookeeper.hosts:
              canal.tcp.batch.size: 500
              canal.tcp.username:
              canal.tcp.password:
            srcDataSources:
              defaultDS:
                url: jdbc:mysql://xxx.xxx.xxx:3341/infusion-xxxxx?useUnicode=true # 监听的数据库地址
                username: root                           #数据库账号
                password: xxxxxxx						   #数据库密码
            canalAdapters:
            - instance: example 						   #如果没改过的话 默认这个即可
              groups:
              - groupId: g1
                outerAdapters:
                - name: logger
                - name: es7
                  hosts: http://xxx.xxx.xxx:9200  	   #部署Es的服务器地址
                  properties:
                    mode: rest # or rest
                    # security.auth: test:123456 #  only used for rest mode
                    cluster.name: es					    #部署Es的容器名字
          

          bootstrap.yml

          canal:
            manager:
              jdbc:
                url: jdbc:mysql://xxxxxx:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
                username: root
                password: xxxxxxx
          

          coupon_record.yml 一共3处地方

          dataSourceKey: defaultDS 
          destination: example        #如果之前canal-server没改就使用默认example
          groupId: g1 
          esMapping:
            _index: coupon_record			#对应索引库名称
            _id: _id 
            sql: "select id as _id,coupon_id,create_time,use_state,openid,user_type,user_name,coupon_title,start_time,end_time,order_id,price,condition_price,del_flag from sys_coupon_record"   
            commitBatch: 3000
          

          9. 部署Canal-Adapter

          docker run --name canal-adapter -p 8081:8081 \
          -v /data/canal-adapter/conf/application.yml:/opt/canal-adapter/conf/application.yml \
          -v /data/canal-adapter/conf/bootstrap.yml:/opt/canal-adapter/conf/bootstrap.yml \
          -v /data/canal-adapter/conf/es7:/opt/canal-adapter/conf/es7 \
          -v /data/canal-adapter/logs:/opt/canal-adapter/logs \
          -d canal/canal-adapter:v1.1.7
          

          部署Canal三件套到此结束啦,内容可能有点多,需要认真理解!

          Canal实现MYSQL实时数据同步

转载请注明来自码农世界,本文标题:《Canal实现MYSQL实时数据同步》

百度分享代码,如果开启HTTPS请参考李洋个人博客
每一天,每一秒,你所做的决定都会改变你的人生!

发表评论

快捷回复:

评论列表 (暂无评论,82人围观)参与讨论

还没有评论,来说两句吧...

Top