2022-09-04Canal00
请注意,本文编写于 738 天前,最后修改于 490 天前,其中某些信息可能已经过时。

背景信息

Canal是Github中开源的ETL(Extract Transform Load)软件,其功能原理及详细说明请参见Canal

使用流程

使用canal将MySQL数据同步到表格存储的使用流程如下:

  1. 步骤一:准备MySQL数据源

    在MySQL中,准备待同步的数据。

  2. 步骤二:创建索引

    在阿里云Elasticsearch实例中,创建索引。要求Mapping中定义的字段名称和类型与待同步数据保持一致。

  3. 步骤三:安装并启动Canal-server

    安装Canal-server,然后修改配置文件关联MySQL,Canal-server模拟MySQL集群的一个slave,获取MySQL集群Master节点的二进制日志(binary log),并将日志推送给Canal-adapter。 该服务负责从上游拉取binlog数据、记录位点等。

  4. 步骤四:安装并启动Canal-adapter

    安装Canal-adapter,然后修改配置文件关联MySQL和Elasticsearch,以及定义MySQL数据到Elasticsearch数据的映射字段,用来将数据同步到Elasticsearch。 该服务负责对接Deployer解析过的数据,并将数据传输到目标库中

  5. 步骤五:验证增量数据同步)

    在MySQL中新增、修改或删除数据,查看数据同步结果。

  6. 步骤六:验证增量数据同步(如果不需要同步全量数据,可忽略这一步) 部署完成后,canal默认会自动同步MySQL增量数据。 如果需要同步MySQL全量数据,请手动调用Client-Adapter服务的方法触发同步任务。

待全量数据同步完成后,canal会自动开始增量同步。 在这里插入图片描述 说明:本文部署方式仅针对于1.1.4版本,其他版本未进行测试,酌情处理

步骤一:准备MySQL数据源

进入MySQL配置文件,修改打开binlog,准备好需要抽取的表。

本文创建的表名称为es_log_demo,包含的字段如下所示。

步骤二:创建索引

  1. 登录服务器,根据数据库表结构创建索引(索引若已存在则不需要创建)。

  2. 通过curl执行创建索引命令

    注意 properties中的字段需要与步骤一:准备MySQL数据源中创建的字段(名称和类型)保持一致。

    curl -H 'Content-Type: application/json' --user ES用户名密码 -XPUT http://127.0.0.1:9200/需要创建的索引名 -d {索引配置}

    curl -H 'Content-Type: application/json' --user elastic:ErtyuioP!@$ -XPUT http://127.0.0.1:9200/es_log_demo -d '
    {
        "type": "es_log_demo",
        "mappings": {
            "es_log_demo": {
                "properties": {
                    "id": {
                        "type": "keyword"
                    },
                    "log_date": {
                        "type": "date",
                        "format": "strict_date_optional_time||epoch_millis"
                    },
                    "log_level": {
                        "type": "text"
                    },
                    "log_class": {
                        "type": "keyword"
                    },
                    "line_number": {
                        "type": "integer"
                    },
                    "log_detail": {
                        "type": "text"
                    }
                }
            }
        },
        "settings": {
            "index": {
                "max_result_window": "2000000000",
                "number_of_shards": 3,
                "number_of_replicas": 2
            }
        }
    }'

    创建成功后,返回如下结果。

    {
      "acknowledged" : true,
      "shards_acknowledged" : true,
      "index" : "索引名"
    }

步骤三:安装并启动Canal-server(Deployer)

  1. 下载canal.deployer-1.1.4.tar.gz包并解压。

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

    解压后,可以看到解压路径下的bin、conf、plugin、lib文件夹(解压默认没有外部文件夹,需要提前新建。或者使用 -C指定文件夹路径)。

  2. canal实例名称默认为example,如果需要自定义canal实例名称,例如改为demo,请执行以下操作(可不修改)。

    1. 修改canal.properties文件中canal.destinations(默认为第96行)的值为自定义的canal实例名称,其他配置均保持默认即可。

      canal.destinations = demo
    2. 在conf路径下创建以canal实例名称命名的文件夹demo,并将conf/example路径下的instance.properties文件复制到conf/demo/路径下。

  3. 修改instance.properties文件(默认在conf/example文件夹下)。

    1. 修改需要抽取的数据库地址,canal.instance.master.address(默认在第9行),例如:127.0.0.1:3306
    2. 修改需要抽取的数据库用户名,canal.instance.dbUsername(默认在第33行),例如:root
    3. 修改需要抽取的数据库密码,canal.instance.dbPassword(默认在第34行),例如:QwertyuioP!@$
  4. instance.properties修改后如下

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=QwertyuioP!@$
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

instance.properties文件中的主要配置项说明请参见下表。

配置项是否必填示例值描述
canal.instance.master.addressrm-bp15p07134rkvf7****.mysql.rds.aliyuncs.com:****
127.0.0.1:3306
canal监听的数据库地址,格式为host:port
canal.instance.rds.accesskeyLTAn*********************当MySQL为阿里云产品RDS库时,填写登录账号的AccessKey ID和AccessKey Secret,获取方式请参见为RAM用户创建访问密钥如果非RDS库,无需填写此项。
canal.instance.rds.secretkeyzbnK**************************
canal.instance.rds.instanceIdrm-bp15p0713****当MySQL为阿里云产品RDS库时,填写实例ID。如果非RDS库,无需填写此项。
canal.instance.dbUsernameroot数据库账号用户名。
canal.instance.dbPasswordQwertyuioP!@$数据库账号密码。
canal.instance.filter.regex.\..canal实例关注的表。通过正则表达式匹配。此处表示匹配所有数据库下的所有表。
canal.destinationsdemocanal实例名称,必须与配置文件所在上层路径相同。例如配置文件的路径为conf/demo/instance.properties,则canal实例名称为demo。
  1. 通过bin路径下的sh脚本启动项目

    ./bin/startup.sh
    sh bin/startup.sh

    启动完成后查看logs/canal/canal.log

    出现the canal server is running now ......即为启动成功

步骤四:部署Client-Adapter

  1. 下载canal.adapter-1.1.4.tar.gz包并解压。

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz

    解压后,可以看到项目路径下的bin、conf、plugin、lib文件夹(解压默认没有外部文件夹,需要提前新建。或者使用 -C指定文件夹路径)。

  2. 修改配置文件。

    i. 修改conf路径下application.yml文件中表格存储相关的配置信息

    1. 修改端口

      server,默认为8081

      canal.conf,默认在第11行,Deployer的IP和端口

    2. 修改数据库配置

      1. canal.conf:srcDataSources:defaultDS,默认在23行,示例:jdbc:mysql://127.0.0.1:3306/demo?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai
      2. canal.conf:srcDataSources:defaultDS,默认在24行,示例:root
      3. canal.conf:srcDataSources:defaultDS,默认在25行,示例:QwertyuioP!@$
    3. 修改实例名称

      canal.conf:canalAdapters:- instance,默认是example,需要与Deployer的实例名称(canal.destinations)保持一致

    4. 修改推送数据源

      1. 推送到ES
        1. 推送到哪个数据库canal.conf.groups:outerAdapters,默认在60行,示例:es,可不修改
        2. ES地址:canal.conf.groups:outerAdapters,默认在61行,示例:127.0.0.1:9200
        3. ES配置:canal.conf.groups:outerAdapters:properties,默认在63行,示例:rest(可为rest与transport,但需要固定为rest)
        4. ES用户名密码:canal.conf.groups:outerAdapters:properties.auth,默认在64行,示例(格式:用户名:密码):elastic!@$
        5. canal.conf.groups:outerAdapters:properties.name,默认在65行,示例:elasticsearch

    其他配置项的说明请参见ClientAdapter配置项说明

    配置项说明
    canal.conf.canalServerHostDeployer访问地址
    canal.conf.srcDataSources.defaultDS.url需要设置为jdbc:mysql://<地址>:<端口>/<数据库名称>,例如jdbc:mysql://127.0.0.1:3306/demo?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai
    canal.conf.srcDataSources.defaultDS.usernameMySQL数据库的账号名称
    canal.conf.srcDataSources.defaultDS.passwordMySQL数据库的密码。
    canal.conf.canalAdapters.groups.outerAdapters.hosts定位到name的位置,将hosts替换为**<Elasticsearch实例的内网地址>:<内网端口>,相关信息可在Elasticsearch实例的基本信息页面获取。例如es-cn-v64xxxxxxxxx3medp.elasticsearch.aliyuncs.com:9200**。
    canal.conf.canalAdapters.groups.outerAdapters.mode必须设置为rest。
    canal.conf.canalAdapters.groups.outerAdapters.properties.security.auth需要设置为**<Elasticsearch实例的账号>:<密码>。例如elastic**。
    canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.nameElasticsearch实例的ID,没有可填写elasticsearch

    application.yml文件的完整配置示例如下:

       server:
         port: 8081
       spring:
         jackson:
           date-format: yyyy-MM-dd HH:mm:ss
           time-zone: GMT+8
           default-property-inclusion: non_null
       
       canal.conf:
         mode: tcp # kafka rocketMQ
         canalServerHost: 127.0.0.1:11111
       #  zookeeperHosts: slave1:2181
       #  mqServers: 127.0.0.1:9092 #or rocketmq
       #  flatMessage: true
         batchSize: 500
         syncBatchSize: 1000
         retries: 0
         timeout:
         accessKey:
         secretKey:
         srcDataSources:
           defaultDS:
             url: jdbc:mysql://127.0.0.1:3306/demo?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai
             username: root
             password: QwertyuioP!@$
         canalAdapters:
         - instance: example # canal instance Name or mq topic name
           groups:
           - groupId: g1
             outerAdapters:
             - name: logger
       #      - name: rdb
       #        key: mysql1
       #        properties:
       #          jdbc.driverClassName: com.mysql.jdbc.Driver
       #          jdbc.url: jdbc:mysql://127.0.0.1:3306/my2?useUnicode=true
       #          jdbc.username: root
       #          jdbc.password: 121212
       #      - name: rdb
       #        key: oracle1
       #        properties:
       #          jdbc.driverClassName: oracle.jdbc.OracleDriver
       #          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
       #          jdbc.username: mytest
       #          jdbc.password: m121212
       #      - name: rdb
       #        key: postgres1
       #        properties:
       #          jdbc.driverClassName: org.postgresql.Driver
       #          jdbc.url: jdbc:postgresql://localhost:5432/postgres
       #          jdbc.username: postgres
       #          jdbc.password: 121212
       #          threads: 1
       #          commitSize: 3000
       #      - name: hbase
       #        properties:
       #          hbase.zookeeper.quorum: 127.0.0.1
       #          hbase.zookeeper.property.clientPort: 2181
       #          zookeeper.znode.parent: /hbase
             - name: es
               hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
               properties:
                 mode: rest # or transport
                 security.auth: elastic:ErtyuioP!@$ #  only used for rest mode
                 cluster.name: elasticsearch

    ii.同样的方式,修改conf/es/*.yml文件,定义MySQL数据到Elasticsearch数据的映射字段。

    说明 如果不存在conf/es路径,请手动创建该路径。

    1. 在conf/es路径下创建.yml格式的文件

      1. biz_order.yml
      2. customer.yml
      3. mytest_user.yml(全量推送数据时使用)
    2. 在biz_order.yml中填写以下内容并配置相应参数。

      1. dataSourceKey,与application.yml中的canal.conf.srcDataSources.defaultDS一致,示例:defaultDS
      2. destination,与deployer的实例名一致,示例:example
      3. groupId,与application.yml中的canal.conf.canalAdapters.groups:-groupId一致,示例:g1
      dataSourceKey: defaultDS
      destination: example
      groupId: g1
      esMapping:
        _index: es_log_demo
        _type: _doc
        _id: _id
      #  relations:
      #    customer_order:
      #      name: order
      #      parent: customer_id
        sql: "select id _id, id, log_date, log_level, log_class, line_number, log_detail from es_log_demo"
      #  skips:
      #    - customer_id
      #  etlCondition: "where t.c_time>={}"
        commitBatch: 3000
    3. 在customer.yml中填写以下内容并配置相应参数。

      dataSourceKey: defaultDS
      destination: example
      groupId: g1
      esMapping:
        _index: es_log_demo
        _type: _doc
        _id: _id
      #  relations:
      #    customer_order:
      #      name: customer
        sql: "select id _id, id, log_date, log_level, log_class, line_number, log_detail from es_log_demo"
      #  etlCondition: "where t.c_time>={}"
        commitBatch: 3000
    4. 在mytest_user.yml中填写以下内容并配置相应参数。

      dataSourceKey: defaultDS
      destination: example
      groupId: g1
      esMapping:
        _index: es_log_demo
        _type: _doc
        _id: _id
        upsert: true
      #  pk: id
        sql: "select id _id, id, log_date, log_level, log_class, line_number, log_detail from es_log_demo"
      #  objFields:
      #    _labels: array:;
      #  etlCondition: "where a.c_time>={}"
        commitBatch: 1000
      配置项说明
      esMapping._index步骤二:创建索引中,在Elasticsearch实例中所创建的索引的名称。本文使用es_log_demo
      esMapping._type步骤二:创建索引章节中,在Elasticsearch实例中所创建的索引的类型。本文使用_doc
      esMapping._id需要同步到Elasticsearch实例的文档的id,可自定义。本文使用_id
      esMapping.sqlSQL语句,用来查询需要同步到Elasticsearch中的字段。本文使用select id _id, id, log_date, log_level, log_class, line_number, log_detail from es_log_demo
      esMapping.commitBatch批量提交条数,可设置为默认的3000
  3. 修改完成后,启动Canal-adapter服务,并查看日志。

    ./bin/startup.sh
    sh bin/startup.sh

    启动完成后查看logs/adapter/adapter.log

    出现c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in * seconds (JVM running for *)即为启动成功

步骤五:验证增量数据同步

  1. 在MySQL数据库对应的表中,新增、修改或删除表的记录。
  2. 登录Elasticsearch-head查看对应的索引中是否有数据变更。

步骤六:同步MySQL全量数据(如果不需要同步全量数据,可忽略这一步)

执行以下命令调用Client-Adapter服务的方法触发同步任务。此时,canal会先中止增量数据传输,然后同步全量数据。待全量数据同步完成后,canal会自动进行增量数据同步。

  • 命令格式

    curl "hostip:port/etl/type/task" -X POST
  • 示例

    curl "127.0.0.1:8081/etl/es/mytest_user.yml" -X POST

执行后需要等待,成功后会显示以下内容:

{
	"succeeded": true,
	"resultMessage": "导入ES 数据:* 条"
}

详细配置项说明请参见下表。

配置项是否必选示例描述
hostiplocalhost部署canal服务的机器IP地址。当在部署canal服务的机器上执行此命令时,可设置hostip为localhost。
port8081adapter端口
typees下游数据库类型
taskmytest_user.yml任务配置文件的名称,必须与步骤四:部署Client-Adapter中创建的.yml格式的文件名称相同。

常见问题及解决方式

1. 部署后启动出现(src-data-sources******)

Failed to bind properties under 'canal.conf' to com.alibaba.otter.canal.adapter.launcher.config.AdapterCanalConfig:
Reason: Unable to set value for property src-data-sources

解决方案1:

​ MySQL使用的是5.7以上版本,驱动问题,lib文件夹下的mysql-connector-java-5.1.40.jar替换为mysql-connector-java-5.1.47.jar即可解决。 ​ canal.adapter-1.1.4数据库驱动默认为5.1.40版本,随本文附带的adapter安装包驱动已修改为5.1.47。 ​ 并且5.1.47版本也放在了jar文件夹内,需要可手动替换。

解决方案2:

jdbcurl: jdbc:mysql://127.0.0.1:3306/demo
修改为
jdbcurl: jdbc:mysql://127.0.0.1:3306/demo?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai

2. 启动adapter出现以下问题(Not found the mapping info of index:******):

java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Not found the mapping info of index: es_log_demo
        at com.alibaba.otter.canal.client.adapter.support.Util.sqlRS(Util.java:65)
        at com.alibaba.otter.canal.client.adapter.es.service.ESEtlService.executeSqlImport(ESEtlService.java:80)
        at com.alibaba.otter.canal.client.adapter.support.AbstractEtlService.lambda$importData$1(AbstractEtlService.java:91)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Not found the mapping info of index: es_log_demo
        at com.alibaba.otter.canal.client.adapter.es.service.ESEtlService.lambda$executeSqlImport$1(ESEtlService.java:206)
        at com.alibaba.otter.canal.client.adapter.support.Util.sqlRS(Util.java:60)
        ... 6 common frames omitted
Caused by: java.lang.IllegalArgumentException: Not found the mapping info of index: es_log_demo
        at com.alibaba.otter.canal.client.adapter.es.support.ESTemplate.getEsType(ESTemplate.java:497)
        at com.alibaba.otter.canal.client.adapter.es.support.ESTemplate.getValFromRS(ESTemplate.java:262)
        at com.alibaba.otter.canal.client.adapter.es.service.ESEtlService.lambda$executeSqlImport$1(ESEtlService.java:98)
        ... 7 common frames omitted

解决方案1:

​ 未创建索引,创建方式参考步骤二

解决方案2:

​ 已创建索引,但索引字段未设置mapping,请根据实际结构设置对应的mapping字段及类型

​ 示例:

curl -H 'Content-Type: application/json' --user elastic:ErtyuioP!@$ -XPOST http://127.0.0.1:9200/es_log_demo/_doc/_mapping?pretty -d '
{
    "_doc" : {
        "properties" : {
            "id": {          
               "type": "integer"       
            },
            "log_date": {
               "type": "date",
               "format": "strict_date_optional_time||epoch_millis"
            },
            "log_level" : {
                "type" : "text"                   
            },
            "log_class" : {
                "type" : "keyword"                    
            },
            "line_number" : {
                "type" : "integer"                    
            },
            "log_detail" : {
                "type" : "text"                    
            }
        }
    }
}'
配置项说明
http://127.0.0.1:9200ES:ip+端口
es_log_demo索引名称
_doctype
_mapping固定为_mapping
pretty固定为pretty

3. 启动canal.deployer后logs/demo/demo.log,出现以下报错(Could not find first log file name in binary log index file)

2022-04-11 14:43:30.101 [destination = example , address = /192.168.7.181:33307 , EventParser] ERROR c.a.o.canal.parse.inbound.mysql.dbsync.DirectLogFetcher - I/O error while reading from client socket
java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file
        at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102) ~[canal.parse-1.1.4.jar:na]
        at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:235) [canal.parse-1.1.4.jar:na]
        at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:265) [canal.parse-1.1.4.jar:na]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
2022-04-11 14:43:30.101 [destination = example , address = /192.168.7.181:33307 , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - dump address /192.168.7.181:33307 has an error, retrying. caused by 
java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file
        at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102) ~[canal.parse-1.1.4.jar:na]
        at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:235) ~[canal.parse-1.1.4.jar:na]
        at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:265) ~[canal.parse-1.1.4.jar:na]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
2022-04-11 14:43:30.101 [destination = example , address = /192.168.7.181:33307 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file
        at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102)
        at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:235)
        at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:265)
        at java.lang.Thread.run(Thread.java:748)
]

原因:

​ Canal同步是模拟MySQL集群的一个slave ​ 当mysql主备切换时,无论binlog文件名是否相同,如果原主节点position大于主备切换后主库当前binlog的position

解决方案:

​ 删除canal.deployer/conf/example下的meta.dat文件,重启:canal.deployer即可

4. 启动canal.deployer后logs/demo/demo.log,出现以下报错(parse row data failed.)

2022-04-12 17:24:18.095 [MultiStageCoprocessor-Parser-example-6] ERROR com.alibaba.otter.canal.common.utils.NamedThreadFactory - from MultiStageCoprocessor-Parser-example-6
com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:zzddt_dev.sszvcl_team_month_count,27 vs 25
2022-04-12 17:24:18.099 [MultiStageCoprocessor-Parser-example-8] ERROR com.alibaba.otter.canal.common.utils.NamedThreadFactory - from MultiStageCoprocessor-Parser-example-8
com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:zzddt_dev.sszvcl_team_month_count,27 vs 25

原因:

​ 修改了数据库表结构,同时也没有开启ddl同步

解决方案:

1. 删除canal.deployer/conf/example下的meta.dat文件,
2. 重启canal.deployer

本文作者:GT-IT

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!