- A+
对于中文站点来说,全文搜索一定会是一个痛点。流行的关系型数据库对于中文检索的支持基本来自于MySQL自带的ngram和第三方插件Sphinx(英文部分coreseek)。
ngram作为分词器是没什么问题的,但是MySQL对于多字段查询的支持实在是不好(这并不是说MySQL不好),你可以使用like语句,不过like并不是万能的;当然你也可以通过联合索引的方式优化,不过这会让你的表结构越来越复杂(而且效果也不见得很好)。
Sphinx是一个非常成熟的解决方案,底层的语言是C++,这使得它在性能上有着得天独厚的优势。但是由于Sphinx本身的实现问题,对增量更新的支持上并不优雅。并且由于辅助表的存在,会使得MySQL的压力进一步增大。
Elasticsearch的优点在于其天生的集群能力,不过由于其底层是Java,所以在CPU和内存开销上都不如Sphinx。但是更强大的自定义功能和复杂查询语句支持最终让我选择了Elasticsearch作为搜索系统的基础。这一点我稍后还会聊到。
Principle
我个人认为搜索是一个相对独立的功能,但是数据来源却和其他各个系统密不可分,所以如何将其他业务系统中的MySQL数据实时导入到Elasticsearch就成为了一个亟待解决的问题。
经过和杯具一起搜索后,发现go-mysql-elasticsearch和Canal这两个项目都是通过对MySQL的binlog进行解析实现同步的。
前者使用go语言,部署非常方便,同时能够支持对表和列进行选择。虽然README上面写着对MySQL 8 和 Elasticsearch 6 的支持还在TODO中,但是经过测试,它已经能正常工作。
后者来自阿里巴巴的开源项目,底层使用JAVA,除了能对表和列进行选择同步,还对SQL做了进一步的支持。canal将同步的过程抽象成“生产-消费”模型,这样一来就可以引入队列和不同的消费者(适配器)。
团队讨论后我们最终选择使用canal进行同步,因为他比go-mysql-elasticsearch的支持更好,可预见的瓶颈更小。当然另一个原因是go-mysql-elasticsearch的维护状态并不是很好,有一段时间甚至被标为deprecated。不过如果你的需求非常简单,我仍然推荐这个go语言写的同步工具。
Get familiar with canal
在使用docker部署canal前我个人建议先在容器外跑一遍,因为canal的Dockerfile写的实在是不怎么样。
正如上文提到的,canal使用了“生产-消费”模型,在本文的场景中,生产者是MySQL,而消费者是Elasticsearch。而canal和canal-adapter则在生产者和消费者之间作为消息传递的通道。
MySQL -> canal -> canal-adapter -> Elasticsearch
canal通过对消息队列的支持实现生产消费的进一步解耦,支持Kafka和RocketMQ。同时也支持Promethus的指标监控,方便实现异常报警。
MySQL -> canal -> Kafka/RocketMQ -> canal-adapter -> Elasticsearch
canal也原生支持keepalived高可用,在局域网环境下通过VRRP做到无缝切换主备节点。
How to do
Environment:
- Canal: v1.1.5
- Canal-adapter: v1.1.5
- Elasticsearch: v7.14.2
- MySQL: 8.0.x
之所以要表明环境是因为canal在1.1.5的release中仍然存在一些小问题,不过我们可以通过重新打包的方式解决,如果canal发布了新的版本,文本的问题解决部分还需要结合实际情况食用。
MySQL
MySQL部分需要对canal使用的账号授予远程访问和响应数据库的权限,同时开启binlog,这部分由于Docker已经默认配置好了,不再赘述,如果你是物理机直接部署,可以搜索相关教程。
Canal-server
canal-server 负责解析 binlog 并将消息推送给消费者。canal-server的配置相对简单,只需要指定你想创建的canal实例的名称,并且简单配置数据库的访问信息。
首先从release下载canal.deployer,并将其解压到 /usr/local/canal
目录下(canal内部的启动脚本是这样定义的),解压完成后可以可以看到conf下面有 canal.properties
和 example
文件夹
conf/
├── canal_local.properties
├── canal.properties
├── example
│ ├── h2.mv.db
│ ├── instance.properties
│ └── meta.dat
├── logback.xml
├── metrics
│ └── Canal_instances_tmpl.json
└── spring
├── base-instance.xml
├── default-instance.xml
├── file-instance.xml
├── group-instance.xml
├── memory-instance.xml
└── tsdb
├── h2-tsdb.xml
├── mysql-tsdb.xml
├── sql
│ └── create_table.sql
└── sql-map
├── sqlmap-config.xml
├── sqlmap_history.xml
└── sqlmap_snapshot.xml
前者记录了canal监听的端口(TCP模式)以及消息队列相关的配置,这里只需要更改
canal.destination = {设置的canal名称}
并且在conf目录下新建同名文件夹。将example下的instance.properties复制到其中,接下来需要在其中设置MySQL的链接信息,并使用正则表达式过滤需要的数据库和表:
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=password
# 白名单
canal.instance.filter.regex={你的数据库}.{某个表}
# 黑名单 使用正则匹配
canal..instance.filter.black.regex=mysql\\.slave_.*
配置完成后即可使用bin目录下的startup脚本启动canal-server,默认情况下是监听在11111端口。
Canal-adapter
已知1.1.5版本的adapter存在连接池问题,需要更改pom.xml重新打包,这一部分可以参考这个issue, 更为详细的指导可以参考掘金的这篇文章
替换完plugin之后,就可以开始配置adapter
conf/
├── application.yml
├── bootstrap.yml
├── es6
│ ├── biz_order.yml
│ ├── customer.yml
│ ├── mytest_user.yml
│ └── suggest.yml
├── es7
│ ├── biz_order.yml
│ ├── customer.yml
│ ├── mytest_user.yml
│ └── suggest.yml
├── hbase
│ └── mytest_person2.yml
├── kudu
│ └── kudutest_user.yml
├── logback.xml
├── META-INF
│ └── spring.factories
└── rdb
└── mytest_user.yml
可以看到conf目录下有不同的消费者配置,不过我们首先来配置application.properties,这其中设置了数据源和消费者:
srcDataSources:
{自定义数据名}:
url: jdbc:mysql://127.0.0.1:3306/{你的数据库}?useUnicode=true
username: root
password: password
canalAdapters:
- instance:{设置的canal名称} # canal instance Name or mq topic name
groups:
- groupId: g1 # 设置组名
outerAdapters:
- name: logger # 开启日志,注释之后不会再输出日志到终端
- name: es6 # 必须是es6/es7/hbase/kudu/rbd其中之一
key: mykey # Option 设置一个键
hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: docker-cluster
接下来需要根据你设置的adapter类型在对应文件夹下新建一个yml文件
dataSourceKey: {自定义数据名}
destination: {设置的canal名称}
groupId: g1 # 组id要和上文一致
outerAdapterKey: mykey # 之前设置的key
esMapping:
_index: suggest
#_type: suggest # es6需要配置type,但是es7不需要
pk: id # 通过pk设置es的索引
sql: "select id, adddate, keywords, userid from suggest"
commitBatch: 3000
etlCondition: "where id>={} and id<{}" # 后续选择性同步的时候会用到
我们要根据application.properties中的设置修改数据源和目的地名称。如果你不是使用id作为自己的主键,可以这么写
esMapping:
_index: suggest
_id: _id
sql: "select userid as _id from suggest"
canal支持非常复杂的sql语句,不过也仍有一些限制,这些限制可以在wiki中看到
最后的 etlCondition
是用来做全量同步的使用分批处理的时候要用的。
配置完成后,同样使用bin目录下的startup脚本启动adapter,如果没有什么意外,你可以通过动态的修改数据库查看终端输出确认canal工作正常。
Full sync
canal支持增量更新,不过在第一次使用的时候需要手动触发全量同步。同步之前请确认你要同步的索引已经在elasticsearch中创建了,不然会报错(canal并不会帮你创建索引)。
canal-adapter在本地8081暴露了一个管理端口,我们可以通过REST API触发全量同步
curl -X POST http://localhost:8081/etl/{es6|es7|hbase|rdb}/{mykey}/{配置文件名}
其中mykey的部分是你配置了outerAdapterKey才需要使用的。你也可以通过定义的etlCondition对同步的范围做规定
curl -X POST http://localhost:8081/etl/{es6|es7|hbase|rdb}/{mykey}/{配置文件名} -d "params=0;1000"
这就对应了etlCondition中的两个参数,实现只同步id在0到1000的数据。
Summary
将mysql的数据同步到Elasticsearch本身并不是稀奇的事情,但是这种同步能力和生产消费模型却能让多个数据库连动。除了canal本身提供的一些第三方客户端之外,我们也可以使用消息队列的客户端,自定义消费行为,实现更灵活的处理能力
当然代价也是显著的,毕竟搜索我已经鸽了四个月了(逃
Reference:
- 我的微信
- 这是我的微信扫一扫
- 我的微信公众号
- 我的微信公众号扫一扫