MSSQL 使用Change Tracking(更改跟踪)同步数据
很多时候,系统的瓶颈都在数据库的读写方面。一般解决方案都是分库、分表,但是分库分表也有自己的问题:1、连接查询(Join操作)效率降低;2、往往需要修改代码。此时,另一个常用的方法就是读写分离,主库进行增删改,从库只进行读。这样就涉及到主从数据库同步的问题。不同的数据库有不同的解决方案,对于MS SQL Server有日志传送、Always On等方案。
但是,上面的方案都是在同质的数据存储之间进行同步,简单来说,从MSSQL同步到另一台MSSQL。如果从MSSQL同步到MySql数据库,或者从MSSQL同步到消息队列或者缓存,则需要采用其他方法。不同的数据库提供了不同的解决方案,MSSQL提供了两个方案,Change Tracking 和 Change Data Capture。我在Kafka Connect 实时读取MSSQL数据到Kafka 中也做了简单的介绍。这篇文章将更详细介绍如何使用Change Tracking来实现数据同步。
开启Change Tracking
新建测试表 users 和 user_play
新建测试库DataSync,然后建两张表。users表示用户表,user_play表示用户在哪个房间进行游戏(一个虚拟的游戏数据库)。
USE [DataSync] GO CREATE TABLE [dbo].[users] ( [user_id] [int] IDENTITY ( 1, 1 ) NOT NULL, [user_name] [varchar] ( 50 ) NOT NULL, CONSTRAINT [PK_users] PRIMARY KEY CLUSTERED ( [user_id] ASC )) CREATE TABLE [dbo].[user_play] ( [id] [int] IDENTITY ( 1, 1 ) NOT NULL, [user_name] [varchar] ( 50 ) NOT NULL, [room] [varchar] ( 50 ) NOT NULL, [score] [int] NOT NULL, [remark] [varchar] ( 500 ) NULL, CONSTRAINT [PK_user_play] PRIMARY KEY CLUSTERED ( [user_name] ASC, [room] ASC ))
我们会先用users表测试,后面再看user_play表。
开启ChangeTracking
需要先在数据库开启Change Tracking,再在表上开启:
ALTER DATABASE DataSync SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)
CHANGE_RETENTION说明了变更记录保留的时间为2天(默认值);AUTO_CLEANUP说明是否开启自动清理记录,一般选择打开。
接下来现在users表上开启:
ALTER TABLE DataSync.dbo.users ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = OFF)
当开启 TRACK_COLUMNS_UPDATED 这个选项时,会记录对哪个行的哪个列进行了更新;否则只会记录对哪个行进行了更新(根据主键)。
Change Tracking的常用SQL语句
接下来我们就可以测试一下Change Tracking,它的名字(更改跟踪)已经很好的说明了它的功能:用一个long型的数字作为版本号,记录下发生变更的表的主键。如果开启了TRACK_COLUMNS_UPDATED,则会记录变更的列,否则不记录。
它只会记录哪个列发生了变更,并不会记录变更的内容。举例来说,当users表id=1的行的user_name从jimmy改为了jack,只会记录user_name,而不会记录jimmy和jack。
获取数据库中的哪些表启用了Change Tracking
我们首先要知道哪些表启用了更改跟踪,然后才能对这些启用了的表进行同步:
Select Object_Name(object_id) table_name, * from sys.change_tracking_tables
这里的几个列的含义如下:
- is_track_columns_update_on:是否开启了TRACK_COLUMNS_UPDATED。
- min_valid_version:最小可用版本。我们没有必要保存所有的变更记录(因为可能很快就处理了),否则这个数据量就会膨胀的非常大,耗费磁盘空间。那么当进行清理后,再次同步时,就不能从最开始的版本进行同步,而只能从最小可用版本。
- begin_version:初始版本号。
- cleanup_version:清理的版本号。
获取表的最小可用版本和当前版本
这里最重要的一点就是注意到:版本号是个自增字段,并且是在多个表之间共同的。我们会在后面回顾这一点。
select CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID('users')) as MinValidVersion, CHANGE_TRACKING_CURRENT_VERSION() as CurrentVersion
可以看到,在初始状态时,当前的版本为0。而最小可用版本为1(这里有点奇怪,是容易出BUG的地方)。
测试Change Tracking
插入数据
insert into users(user_name) values('子阳')
此时执行检查版本的语句,可以看到最小可用版本由0变为了1:
接下来要查看更改的记录,执行下面的SQL语句:
select * from CHANGETABLE(CHANGES [users], 0) c
每次想要获取变更信息,都需要调用CHANGETABLE这个函数。它的第一个参数是要查询的表名,这里是users,第二个参数是版本号。这个版本号的取值非常重要,因为CHANGETABLE函数将返回该版本与当前版本的差异。就此处而言,因为当前版本是1,所以会返回从版本0到版本1之间的差异。
注意这里的几个字段:
- user_id: 这个是users表的主键,如果表是复合主键,那么就会有多个字段。
- SYS_CHANGE_OPERATION: “I”表示新增,“U”表示更新,“D”表示删除。
- SYS_CHANGE_VERSION: 表示该行的当前版本。
- SYS_CHANGE_CREATION_VERSION: 表示该行创建时的版本。
SYS_CHANGE_VERSION 可以用于解决一些不一致问题,在 MSSQL ChangeTracking的不一致问题中介绍。
如果修改下上面的语句,将version传入为1,因为当前版本也是1,两个版本相同,返回空行:
注意到当前可用版本为1,因此我们在同步时,如果上一次同步的结果小于MinValidVersion,则从 MinValidVersion-1开始同步(如果从MinValidVersion开始,那么会少了最小可用版本的数据),C#伪代码如下:
if(lastVersion < minValidVersion){ lastVersion = minValidVersion - 1; } // 接下来 // 1. 调用 select CHANGE_TRACKING_CURRENT_VERSION(), 获取当前版本currentVersion,并保存之;同时作为下一次同步时的 lastVersion // 2. 调用 select * from CHANGETABLE(CHANGES [table_name], lastVersion) c 获取从lastVersion到currentVersion的变更数据
在做同步时,我们不仅需要知道哪些行更新了,也需要知道更新的内容。虽然上面只返回了主键,但这个可以很容易地通过join连接查询来实现:
select t.*, c.SYS_CHANGE_OPERATION from CHANGETABLE(CHANGES [users], 0) c left join users t on c.user_id = t.user_id
更改数据
了解了这个机制以后,后面的就很容易了,调用下面的语句将user_name从“子阳”改为“jimmy”:
update users set user_name='jimmy' where user_id=1
先查看当前的版本:
此时,可以看到当前的版本变为了2。接下来再查看哪些行进行了变更,为了进行对比,分别调用CHANGETABLE(CHANGES [users], 0)和CHANGETABLE(CHANGES [users], 1):
注意上图的SYS_CHANGE_OPERATION为I,因为版本0和版本2的差异是新增了一条数据。
这里的SYS_CHANGE_OPERATION为U,因为版本1和版本2的差异是更新了一条数据。
这个机制可以用于同步多个数据池,可能每个数据池同步的进度不一致,那么就需要传入不同的version,然后根据返回的参数进行操作。如果是I,则插入数据;如果是U则更新数据。
如果输入CHANGETABLE(CHANGES [users], 2),返回空行,因为当前的版本也是2.
删除数据
接下来执行删除操作,删除user_id为1的数据:
delete users where user_id=1
删除操作比较有意思,因为不论哪个版本,和当前版本(现在已经为3了)对比,都是少了一条数据,因此CHANGETABLE(CHANGES [users], 0)、CHANGETABLE(CHANGES [users], 1)、CHANGETABLE(CHANGES [users], 2) 的SYS_CHANGE_OPERATION均为D。
其他注意事项
当有多个表时,当前版本是共用的
换句话来说,就是:当前版本是针对数据库的,不是针对表的。
还记得我们上面创建了两张表吧?先往user_play表中插入一条数据
insert into user_play(user_name, room, score) values('子阳', '标准场', 100)
接着开启user_play表的Change Tracking:
ALTER TABLE DataSync.dbo.user_play ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON)
然后查询一下两张表的最小可用版本:
因为user_play表是在当前版本为3时才开启Change Tracking功能,因此它的最小可用版本为3。而CurrentVersion对于user_play和users表都是一样的。
此时,对于user_play而言,不论 CHANGETABLE(CHANGES [user_play], version) 中的版本传入几,都会返回空行,因为开启Chnage Tracking之前的记录是无法获得的。因此前面插入的记录是无法同步的,只能同步开启Change Tracking后的记录。
如果此时向user_play再插入一条记录,其最小可用版本不变,但CurrentVersion变为了4。因此,不论是从MinValidVersion(3)开始,还是从MinValidVersion-1(2)开始,都可以获得正确结果。
通过TRACK_COLUMNS_UPDATED获取更新的列
在开启user_play表的Change Tracking时,我们将TRACK_COLUMNS_UPDATED选项设置为了NO,它可以用来记录当执行Update操作时,更新了哪些列。先执行下面的语句更新一下user_play表:
update user_play set score=150 where id=1
查看一下变更:
这里最后的两列user_name和room是user_play表的主键(对于users表来说是user_id),这里是动态的,总是所查询表的主键。当主键为复合主键时,就会列出主键包含的所有列。
看到SYS_CHANGE_COLUMNS列不再为NULL,但是一个二进制数据,为了得到它的值,需要调用CHANGE_TRACKING_IS_COLUMN_IN_MASK函数。
Select CHANGE_TRACKING_IS_COLUMN_IN_MASK( COLUMNPROPERTY(OBJECT_ID('user_play'), 'score', 'ColumnId'), c.sys_change_columns ) score_changed, CHANGE_TRACKING_IS_COLUMN_IN_MASK( COLUMNPROPERTY(OBJECT_ID('user_play'), 'id', 'ColumnId'), c.sys_change_columns ) id_changed, * from CHANGETABLE(CHANGES [user_play], 4) c
通常来说,是不对主键和自增列进行更新操作的,只会新增和删除。在上面我们查询id列的变化只是作为一个示范。在上表中,并没有返回变化的值(100 --> 150),而只是返回了是否有变化:1表示有变化,0表示没有变化。
因此,后续的同步操作通常是:对于发生变化的列,执行update操作。但是这样通常显得比较繁琐,更简单的办法,和上面的insert时是一样的:不去查询哪个列发生了变化,通过join操作,查询出所有列(包含变化和没变化的),然后对所有非主键、非自增列进行update操作。
Change Tracking不记录中间的变化过程,只记录最终变化的是什么。这句话的意思就是:如果你连续执行很多个update,再调用CHANGETABLE语句(假设当前版本从4递增到了9),那么不轮version传入的是4、5、6、7、8,都返回的是同样的结果:“某一行数据发生了update变化,主键是什么,仅此而已”。但对于我们而言,这也就足够了。
总结
这篇文章演示了如何使用MSSQL的Change Tracking功能来获取变更数据,进而可以通过编写代码来实现数据同步的功能。Change Tracking是一个轻量级的方案,在实际应用中主从服务器的开销并不大。它适合用来将MSSQL数据库中的数据同步到其他异质的数据源中,例如MySQL、Redis、消息队列、HDFS中。
感谢阅读,希望这篇文章能给你带来帮助!