Canal是Github中开源的ETL(Extract Transform Load)软件,其功能原理及详细说明请参见Canal。
使用canal将MySQL数据同步到表格存储的使用流程如下:
步骤一:准备MySQL数据源
在MySQL中,准备待同步的数据。
步骤二:创建索引
在阿里云Elasticsearch实例中,创建索引。要求Mapping中定义的字段名称和类型与待同步数据保持一致。
步骤三:安装并启动Canal-server
安装Canal-server,然后修改配置文件关联MySQL,Canal-server模拟MySQL集群的一个slave,获取MySQL集群Master节点的二进制日志(binary log),并将日志推送给Canal-adapter。 该服务负责从上游拉取binlog数据、记录位点等。
步骤四:安装并启动Canal-adapter
安装Canal-adapter,然后修改配置文件关联MySQL和Elasticsearch,以及定义MySQL数据到Elasticsearch数据的映射字段,用来将数据同步到Elasticsearch。 该服务负责对接Deployer解析过的数据,并将数据传输到目标库中
步骤五:验证增量数据同步)
在MySQL中新增、修改或删除数据,查看数据同步结果。
步骤六:验证增量数据同步(如果不需要同步全量数据,可忽略这一步) 部署完成后,canal默认会自动同步MySQL增量数据。 如果需要同步MySQL全量数据,请手动调用Client-Adapter服务的方法触发同步任务。
待全量数据同步完成后,canal会自动开始增量同步。 说明:本文部署方式仅针对于1.1.4版本,其他版本未进行测试,酌情处理
进入MySQL配置文件,修改打开binlog,准备好需要抽取的表。
本文创建的表名称为es_log_demo,包含的字段如下所示。
登录服务器,根据数据库表结构创建索引(索引若已存在则不需要创建)。
通过curl执行创建索引命令
注意 properties中的字段需要与步骤一:准备MySQL数据源中创建的字段(名称和类型)保持一致。
curl -H 'Content-Type: application/json' --user ES用户名
curl -H 'Content-Type: application/json' --user elastic:ErtyuioP!@$ -XPUT http://127.0.0.1:9200/es_log_demo -d ' { "type": "es_log_demo", "mappings": { "es_log_demo": { "properties": { "id": { "type": "keyword" }, "log_date": { "type": "date", "format": "strict_date_optional_time||epoch_millis" }, "log_level": { "type": "text" }, "log_class": { "type": "keyword" }, "line_number": { "type": "integer" }, "log_detail": { "type": "text" } } } }, "settings": { "index": { "max_result_window": "2000000000", "number_of_shards": 3, "number_of_replicas": 2 } } }'
创建成功后,返回如下结果。
{ "acknowledged" : true, "shards_acknowledged" : true, "index" : "索引名" }
下载canal.deployer-1.1.4.tar.gz包并解压。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
解压后,可以看到解压路径下的bin、conf、plugin、lib文件夹(解压默认没有外部文件夹,需要提前新建。或者使用 -C指定文件夹路径)。
canal实例名称默认为example,如果需要自定义canal实例名称,例如改为demo,请执行以下操作(可不修改)。
修改canal.properties文件中canal.destinations(默认为第96行)的值为自定义的canal实例名称,其他配置均保持默认即可。
canal.destinations = demo
在conf路径下创建以canal实例名称命名的文件夹demo,并将conf/example路径下的instance.properties文件复制到conf/demo/路径下。
修改instance.properties文件(默认在conf/example文件夹下)。
instance.properties修改后如下
################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=root canal.instance.dbPassword=QwertyuioP!@$ canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex= # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #################################################
instance.properties文件中的主要配置项说明请参见下表。
配置项 | 是否必填 | 示例值 | 描述 |
---|---|---|---|
canal.instance.master.address | 是 | rm-bp15p07134rkvf7****.mysql.rds.aliyuncs.com:**** 127.0.0.1:3306 | canal监听的数据库地址,格式为host:port 。 |
canal.instance.rds.accesskey | 否 | LTAn********************* | 当MySQL为阿里云产品RDS库时,填写登录账号的AccessKey ID和AccessKey Secret,获取方式请参见为RAM用户创建访问密钥。如果非RDS库,无需填写此项。 |
canal.instance.rds.secretkey | 否 | zbnK************************** | |
canal.instance.rds.instanceId | 否 | rm-bp15p0713**** | 当MySQL为阿里云产品RDS库时,填写实例ID。如果非RDS库,无需填写此项。 |
canal.instance.dbUsername | 是 | root | 数据库账号用户名。 |
canal.instance.dbPassword | 是 | QwertyuioP!@$ | 数据库账号密码。 |
canal.instance.filter.regex | 是 | .\.. | canal实例关注的表。通过正则表达式匹配。此处表示匹配所有数据库下的所有表。 |
canal.destinations | 是 | demo | canal实例名称,必须与配置文件所在上层路径相同。例如配置文件的路径为conf/demo/instance.properties,则canal实例名称为demo。 |
通过bin路径下的sh脚本启动项目
./bin/startup.sh 或 sh bin/startup.sh
启动完成后查看logs/canal/canal.log
出现the canal server is running now ......即为启动成功
下载canal.adapter-1.1.4.tar.gz包并解压。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
解压后,可以看到项目路径下的bin、conf、plugin、lib文件夹(解压默认没有外部文件夹,需要提前新建。或者使用 -C指定文件夹路径)。
修改配置文件。
i. 修改conf路径下application.yml文件中表格存储相关的配置信息
修改端口
server
canal.conf
修改数据库配置
修改实例名称
canal.conf:canalAdapters:- instance,默认是example,需要与Deployer的实例名称(canal.destinations)保持一致
修改推送数据源
其他配置项的说明请参见ClientAdapter配置项说明。
配置项 | 说明 |
---|---|
canal.conf.canalServerHost | Deployer访问地址 |
canal.conf.srcDataSources.defaultDS.url | 需要设置为jdbc:mysql://<地址>:<端口>/<数据库名称>,例如jdbc:mysql://127.0.0.1:3306/demo?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai |
canal.conf.srcDataSources.defaultDS.username | MySQL数据库的账号名称 |
canal.conf.srcDataSources.defaultDS.password | MySQL数据库的密码。 |
canal.conf.canalAdapters.groups.outerAdapters.hosts | 定位到name |
canal.conf.canalAdapters.groups.outerAdapters.mode | 必须设置为rest。 |
canal.conf.canalAdapters.groups.outerAdapters.properties.security.auth | 需要设置为**<Elasticsearch实例的账号>:<密码>。例如elastic |
canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.name | Elasticsearch实例的ID,没有可填写elasticsearch |
application.yml文件的完整配置示例如下:
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp # kafka rocketMQ canalServerHost: 127.0.0.1:11111 # zookeeperHosts: slave1:2181 # mqServers: 127.0.0.1:9092 #or rocketmq # flatMessage: true batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: srcDataSources: defaultDS: url: jdbc:mysql://127.0.0.1:3306/demo?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai username: root password: QwertyuioP!@$ canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger # - name: rdb # key: mysql1 # properties: # jdbc.driverClassName: com.mysql.jdbc.Driver # jdbc.url: jdbc:mysql://127.0.0.1:3306/my2?useUnicode=true # jdbc.username: root # jdbc.password: 121212 # - name: rdb # key: oracle1 # properties: # jdbc.driverClassName: oracle.jdbc.OracleDriver # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE # jdbc.username: mytest # jdbc.password: m121212 # - name: rdb # key: postgres1 # properties: # jdbc.driverClassName: org.postgresql.Driver # jdbc.url: jdbc:postgresql://localhost:5432/postgres # jdbc.username: postgres # jdbc.password: 121212 # threads: 1 # commitSize: 3000 # - name: hbase # properties: # hbase.zookeeper.quorum: 127.0.0.1 # hbase.zookeeper.property.clientPort: 2181 # zookeeper.znode.parent: /hbase - name: es hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode properties: mode: rest # or transport security.auth: elastic:ErtyuioP!@$ # only used for rest mode cluster.name: elasticsearch
ii.同样的方式,修改conf/es/*.yml文件,定义MySQL数据到Elasticsearch数据的映射字段。
说明 如果不存在conf/es路径,请手动创建该路径。
在conf/es路径下创建.yml格式的文件
在biz_order.yml中填写以下内容并配置相应参数。
dataSourceKey: defaultDS destination: example groupId: g1 esMapping: _index: es_log_demo _type: _doc _id: _id # relations: # customer_order: # name: order # parent: customer_id sql: "select id _id, id, log_date, log_level, log_class, line_number, log_detail from es_log_demo" # skips: # - customer_id # etlCondition: "where t.c_time>={}" commitBatch: 3000
在customer.yml中填写以下内容并配置相应参数。
dataSourceKey: defaultDS destination: example groupId: g1 esMapping: _index: es_log_demo _type: _doc _id: _id # relations: # customer_order: # name: customer sql: "select id _id, id, log_date, log_level, log_class, line_number, log_detail from es_log_demo" # etlCondition: "where t.c_time>={}" commitBatch: 3000
在mytest_user.yml中填写以下内容并配置相应参数。
dataSourceKey: defaultDS destination: example groupId: g1 esMapping: _index: es_log_demo _type: _doc _id: _id upsert: true # pk: id sql: "select id _id, id, log_date, log_level, log_class, line_number, log_detail from es_log_demo" # objFields: # _labels: array:; # etlCondition: "where a.c_time>={}" commitBatch: 1000
配置项 | 说明 |
---|---|
esMapping._index | 步骤二:创建索引中,在Elasticsearch实例中所创建的索引的名称。本文使用es_log_demo。 |
esMapping._type | 步骤二:创建索引章节中,在Elasticsearch实例中所创建的索引的类型。本文使用_doc。 |
esMapping._id | 需要同步到Elasticsearch实例的文档的id,可自定义。本文使用_id。 |
esMapping.sql | SQL语句,用来查询需要同步到Elasticsearch中的字段。本文使用select id _id, id, log_date, log_level, log_class, line_number, log_detail from es_log_demo 。 |
esMapping.commitBatch | 批量提交条数,可设置为默认的3000 |
修改完成后,启动Canal-adapter服务,并查看日志。
./bin/startup.sh 或 sh bin/startup.sh
启动完成后查看logs/adapter/adapter.log
出现c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in * seconds (JVM running for *)即为启动成功
执行以下命令调用Client-Adapter服务的方法触发同步任务。此时,canal会先中止增量数据传输,然后同步全量数据。待全量数据同步完成后,canal会自动进行增量数据同步。
命令格式
curl "hostip:port/etl/type/task" -X POST
示例
curl "127.0.0.1:8081/etl/es/mytest_user.yml" -X POST
执行后需要等待,成功后会显示以下内容:
{ "succeeded": true, "resultMessage": "导入ES 数据:* 条" }
详细配置项说明请参见下表。
配置项 | 是否必选 | 示例 | 描述 |
---|---|---|---|
hostip | 是 | localhost | 部署canal服务的机器IP地址。当在部署canal服务的机器上执行此命令时,可设置hostip为localhost。 |
port | 是 | 8081 | adapter端口 |
type | 是 | es | 下游数据库类型 |
task | 是 | mytest_user.yml | 任务配置文件的名称,必须与步骤四:部署Client-Adapter中创建的.yml格式的文件名称相同。 |
Failed to bind properties under 'canal.conf' to com.alibaba.otter.canal.adapter.launcher.config.AdapterCanalConfig: Reason: Unable to set value for property src-data-sources
解决方案1:
MySQL使用的是5.7以上版本,驱动问题,lib文件夹下的mysql-connector-java-5.1.40.jar替换为mysql-connector-java-5.1.47.jar即可解决。 canal.adapter-1.1.4数据库驱动默认为5.1.40版本,随本文附带的adapter安装包驱动已修改为5.1.47。 并且5.1.47版本也放在了jar文件夹内,需要可手动替换。
解决方案2:
jdbcurl: jdbc:mysql://127.0.0.1:3306/demo 修改为 jdbcurl: jdbc:mysql://127.0.0.1:3306/demo?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Not found the mapping info of index: es_log_demo at com.alibaba.otter.canal.client.adapter.support.Util.sqlRS(Util.java:65) at com.alibaba.otter.canal.client.adapter.es.service.ESEtlService.executeSqlImport(ESEtlService.java:80) at com.alibaba.otter.canal.client.adapter.support.AbstractEtlService.lambda$importData$1(AbstractEtlService.java:91) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Not found the mapping info of index: es_log_demo at com.alibaba.otter.canal.client.adapter.es.service.ESEtlService.lambda$executeSqlImport$1(ESEtlService.java:206) at com.alibaba.otter.canal.client.adapter.support.Util.sqlRS(Util.java:60) ... 6 common frames omitted Caused by: java.lang.IllegalArgumentException: Not found the mapping info of index: es_log_demo at com.alibaba.otter.canal.client.adapter.es.support.ESTemplate.getEsType(ESTemplate.java:497) at com.alibaba.otter.canal.client.adapter.es.support.ESTemplate.getValFromRS(ESTemplate.java:262) at com.alibaba.otter.canal.client.adapter.es.service.ESEtlService.lambda$executeSqlImport$1(ESEtlService.java:98) ... 7 common frames omitted
解决方案1:
未创建索引,创建方式参考步骤二
解决方案2:
已创建索引,但索引字段未设置mapping,请根据实际结构设置对应的mapping字段及类型
示例:
curl -H 'Content-Type: application/json' --user elastic:ErtyuioP!@$ -XPOST http://127.0.0.1:9200/es_log_demo/_doc/_mapping?pretty -d ' { "_doc" : { "properties" : { "id": { "type": "integer" }, "log_date": { "type": "date", "format": "strict_date_optional_time||epoch_millis" }, "log_level" : { "type" : "text" }, "log_class" : { "type" : "keyword" }, "line_number" : { "type" : "integer" }, "log_detail" : { "type" : "text" } } } }'
配置项 | 说明 |
---|---|
http://127.0.0.1:9200 | ES:ip+端口 |
es_log_demo | 索引名称 |
_doc | type |
_mapping | 固定为_mapping |
pretty | 固定为pretty |
2022-04-11 14:43:30.101 [destination = example , address = /192.168.7.181:33307 , EventParser] ERROR c.a.o.canal.parse.inbound.mysql.dbsync.DirectLogFetcher - I/O error while reading from client socket java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102) ~[canal.parse-1.1.4.jar:na] at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:235) [canal.parse-1.1.4.jar:na] at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:265) [canal.parse-1.1.4.jar:na] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151] 2022-04-11 14:43:30.101 [destination = example , address = /192.168.7.181:33307 , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - dump address /192.168.7.181:33307 has an error, retrying. caused by java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102) ~[canal.parse-1.1.4.jar:na] at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:235) ~[canal.parse-1.1.4.jar:na] at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:265) ~[canal.parse-1.1.4.jar:na] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151] 2022-04-11 14:43:30.101 [destination = example , address = /192.168.7.181:33307 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102) at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:235) at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:265) at java.lang.Thread.run(Thread.java:748) ]
原因:
Canal同步是模拟MySQL集群的一个slave 当mysql主备切换时,无论binlog文件名是否相同,如果原主节点position大于主备切换后主库当前binlog的position
解决方案:
删除canal.deployer/conf/example下的meta.dat文件,重启:canal.deployer即可
2022-04-12 17:24:18.095 [MultiStageCoprocessor-Parser-example-6] ERROR com.alibaba.otter.canal.common.utils.NamedThreadFactory - from MultiStageCoprocessor-Parser-example-6 com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:zzddt_dev.sszvcl_team_month_count,27 vs 25 2022-04-12 17:24:18.099 [MultiStageCoprocessor-Parser-example-8] ERROR com.alibaba.otter.canal.common.utils.NamedThreadFactory - from MultiStageCoprocessor-Parser-example-8 com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:zzddt_dev.sszvcl_team_month_count,27 vs 25
原因:
修改了数据库表结构,同时也没有开启ddl同步
解决方案:
1. 删除canal.deployer/conf/example下的meta.dat文件, 2. 重启canal.deployer
本文作者:GT-IT
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!