TiCDC 源码阅读(一)TiCDC 架构概览

2023-02-02 0 940

这一次 TiCDC 写作系列产品该文Sonbhadra从源代码微观来传授 TiCDC 的基本概念,希望能协助听众深入细致地如是说 TiCDC 。第一集该文是这一连串产品该文的第二期,主要叙述了 TiCDC 的目地、构架和统计数据并行链路,意在让听众能先期如是说 TiCDC,为写作其他源代码写作该文起著两个慢板的作用。

TiCDC 是什么?

TiCDC 是 TiDB 自然生态中的两个统计数据并行辅助工具,它能将上游 TiDB软件产业中产生的存量统计数据动态的并行到上游目地地。除了能将 TiDB 的统计数据并行至 MySQL 相容的统计资料库以外,还提供更多了并行至 Kafka 和 s3 的能力,支持 canal 和 avro 等多种不同对外开放消息协定供其他控制系统订户统计数据更改。

TiCDC 源码阅读(一)TiCDC 架构概览
关上腾讯新闻报道 查阅TNUMBERBX

上图叙述了 TiCDC 在整个 TiDB 自然生态控制系统中的边线,它处于两个上游 TiDB 软件产业和上游其他统计数据分析的尾端,甘当了两个统计传输管线的配角。

TiCDC 众所周知的应用领域情景为构筑三套 TiDB 软件产业间的characterization复制,或者相互配合其他直链的控制系统构筑统计数据软件控制系统服务项目。以下将从这两方面为大家如是说:

characterization复制

采用 TiCDC 来构筑characterization复制的 TiDB 软件产业时,根据从软件产业的采用目地,可能会对characterization软件产业的统计数据连续性有不同的要求。目前 TiCDC 提供更多了如下表所示三种等级的统计数据连续性:

TiCDC 源码阅读(一)TiCDC 架构概览
关上腾讯新闻报道 查阅TNUMBERBX
镜像连续性:透过迈入 Syncpoint 机能,能在动态的并行过程中,保证上上游软件产业在某一 TSO 的具有镜像连续性。详尽文本能参照文件格式:TiDB characterization软件产业的统计数据奇偶校验最后连续性:透过迈入 Redo Log 机能,能在上游软件产业发生机械故障的时候,保证上游软件产业的统计数据达到最后完全一致的状况。详细文本能参照文件格式:采用 Redo Log 保证统计数据连续性

统计数据软件控制系统

TiCDC 源码阅读(一)TiCDC 架构概览
关上腾讯新闻报道 查阅TNUMBERBX

目前 TiCDC 提供更多将更改统计数据并行至 Kafka 和 S3 的能力,用户能采用该机能将 TiDB 的统计数据软件控制系统进其他统计数据处理控制系统。在这种应用领域情景下,用户对统计数据采集的动态性和支持的消息格式的多样性会由较高的要求。当前我们提供更多了多种不同可供订户的消息格式(能参照配置 Kafka),并在最近一段时间内对该情景的同步速度做了一连串产品优化,听众能从之后的该文中如是说相关文本。

TiCDC 的构架

TiCDC 源码阅读(一)TiCDC 架构概览
关上腾讯新闻报道 查阅TNUMBERBX

保证统计传输的稳定性、动态性和连续性是 TiCDC 设计的核心目标。为了实现该目标,TiCDC 采用了分布式构架和无状况的服务项目模式,具有高可用和水平扩展的特性。想要深入细致如是说 CDC 的构架,我们需要先认识下面这些概念:

控制系统组件

TiKV:TiKV 内部的 CDC 组件会扫描和拼装 kv change log。提供更多输出 kv change logs 的接口供 TiCDC 订户。Capture:TiCDC 运行进程,多个 capture 组成两个 TiCDC 软件产业。并行任务Sonbhadra按照一定的调度规则被划分给两个或者多个 Capture 处理。

逻辑概念

KV change log:TiKV 提供更多的隐藏大部分内部实现细节的的 row changed event,TiCDC 从 TiKV 拉取这些 Event。Owner:一种 Capture 的配角,每个 TiCDC 软件产业同一时刻最多只存在两个 Capture 具有 Owner 身份,它负责响应用领域户的请求、调度软件产业和并行 DDL 等任务。ChangeFeed:由用户启动并行任务,两个并行任务中可能包含多张表,这些表会被 Owner 划分为多个子任务分配到不同的 Capture 进行处理。Processor:Capture 内部的逻辑线程,两个 Capture 节点中能运行多个 Processor。每个 Processor 负责处理 ChangeFeed 的两个子任务。TablePipeline:Processor 内部的统计数据并行管线,每个 TablePipeline 负责处理一张表,表的统计数据会在这个管线中处理和流转,最后被发送到上游。

基本特性

分布式:具有高可用能力,支持水平扩展。动态性:常规情景下提供更多秒级的并行能力。有序性:输出的统计数据行级别有序,并且提供更多 At least once 输出的保证。原子性:提供更多单表事务的原子性。

TiCDC 的生命周期

认识了以上的基本概念之后,我们能继续如是说一下 TiCDC 的生命周期。

Owner

TiCDC 源码阅读(一)TiCDC 架构概览
关上腾讯新闻报道 查阅TNUMBERBX

首先,我们需要知道,TiCDC 软件产业的元统计数据都会被存储到 PD 内置的 Etcd 中。当两个 TiCDC 软件产业被部署起来时,每个 Capture 都会向 Etcd 注册自己的信息,这样 Capture 就能发现彼此的存在。接着,各个 Capture 之间会竞选出两个 Owner ,Owner 选举流程在cdc/capture.go文件的函数内,下面的代码删除了一些错误处理逻辑和参数设置,只保留主要的流程:

campaignOwner

for{

//Campaigntobetheowner,itblocksuntilitbeenelected.

err:=c.campaign(ctx)

owner:=c.newOwner(c.upstreamManager)

c.setOwner(owner)

err=c.runEtcdWorker(ownerCtx,owner,…)

c.owner.AsyncStop()

c.setOwner(nil)

}

每两个 Capture 进程都会调用该函数,进入两个竞选的 Loop 中,每个 Capture 都会持续不断地在竞选 Owner。同一时间段内只有两个 Capture 会当选,其他候选者则会阻塞在这个 Loop 中,直到上两个 Owner 退出就会有新的 Capture 当选。

最后真正的竞选是透过在函数内部调用 Etcd 的接口实现的,Etcd 保证了同一时间只有两个 Key 能当选为 Owner。由于 Etcd 是高可用的服务项目,TiCDC 借助其力量实现了天然的高可用。

c.campaign(ctx)

election.Campaign

竞选到 Owner 配角的 Capture 会作为软件产业的管理者,也负责监听和响应来自用户的请求。

ChangeFeed

TiCDC 软件产业启动完毕之后,用户即可采用 TiCDC 命令行辅助工具或者 OpenAPI 创建 ChangeFeed (并行任务)。 两个 ChangeFeed 被创建之后,Owner 会负责对它进行检查和初始化,然后将以表为单位将划分为多个子任务分配给软件产业内的 Capture 进行并行。并行任务初始化的代码在cdc/owner/changefeed.go文件中。该函数的主要工作为:

向上游查询该并行任务需要并行的表的 Schema 信息,为接下来调度器分配并行任务做准备。创建两个来拉取 DDL 。因为我们需要在并行的过程中保持多个 Capture 节点上 Schema 信息的完全一致,并且保证 DML 与 DDL 并行顺序。所以我们选择仅由 Owner 这个拥有 ChangeFeed 所以信息的配角并行 DDL。ddlPuller创建,它会负责把该并行任务拆分成多个子任务,发送给别的 Capture 进行处理。scheduler

Capture 接收到 Owner 发送过来的子任务之后,就会创建出两个 Processor 来处理它接收到的子任务,Processor 会为每张表创建出两个 TablePipeline 来并行对应的表的统计数据。Processor 会周期性的把每个 TablePipeline 的状况和进度信息汇报给 Owner,由 Owner 来决定是否进行调度和状况更新等操作。 总而言之,TiCDC 软件产业和并行任务的状况信息会在 Owner 和 Processor 之间流转,而用户需要并行的统计数据信息则透过 TablePipeline 这个管线传递到上游,下两个小节Sonbhadra对 TablePipeline 进行传授,理解了它,就能理解 TiCDC 是怎么并行统计数据的。

TablePipeline

TiCDC 源码阅读(一)TiCDC 架构概览
关上腾讯新闻报道 查阅TNUMBERBX

顾名思义,TablePipeline 是两个表统计数据流动和处理的管线。Processor 接收到两个并行子任务之后,会为每一张表创建出两个 TablePipeline,如上图所示,它主要由 Puller、Sorter、Mounter 和 Sink 构成。

Puller: 负责拉取对应表在上游的更改统计数据,它隐藏了内部大量的实现细节,包括与 TiKV CDC 模块建立 gRPC 连接和反解码统计数据流等。Sorter: 负责对 Puller 输出的乱序统计数据进行排序,并且会把 Sink 来不及消费的统计数据进行落盘,起著两个蓄水池的作用。Mounter:根据事务提交时的表结构信息解析和填充行更改,将行更改转化为 TiCDC 能直接处理的统计数据结构。在这里,Mounter 需要和两个叫做 SchemaStorage 的组件进行交互,这个组件在 TiCDC 内部维护了所需表的 Schema 信息,后续会有文本对这其进行传授。Sink:将 Mounter 处理过后的统计数据进行编解码,转化为 SQL 语句或者 Kafka 消息发送到对应上游。

这种模块化的设计方式,比较有利于代码的维护和重构。值得一提的是,如果你对 TiCDC 有兴趣,希望能让它接入到当前 CDC 还不支持的上游控制系统,那么只要自己编码实现两个对应的 Sink 接口,就能达到目地。 接下来,我们以两个具体例子的方式来传授统计数据在 TiCDC 内部的流转。假设我们现在建立如下表所示表结构:

CREATETABLETEST(

NAMEVARCHAR(20)NOTNULL,

AGEINTNOTNULL,

PRIMARYKEY(NAME)

);

+——-+————-+——+——+———+——-+

|Field|Type|Null|Key|Default|Extra|

+——-+————-+——+——+———+——-+

|NAME|varchar(20)|NO|PRI|NULL||

|AGE|int(11)|NO||NULL||

+——-+————-+——+——+———+——-+

此时,在上游 TiDB 执行以下 DML:

INSERTINTOTEST(NAME,AGE)

VALUES(Jack,20);

UPDATETEST

SETAGE=25

WHERENAME=Jack;

下面我们就来看一看这两条 DML 会透过什么样的形式经过 TablePipeline ,最后写入上游。

Puller 拉取统计数据

上文中提到 Puller 负责与 TiKV CDC 组件建立 gPRC 连接然后拉取统计数据,这是/pipeline/puller.go中的 Puller 大致的工作逻辑:

n.plr=puller.New(…n.startTs,n.tableSpan(),n.tableID,n.tableName…)

n.wg.Go(func()error{

ctx.Throw(errors.Trace(n.plr.Run(ctxC)))

})

n.wg.Go(func()error{

for{

select{

case<-ctxC.Done():

returnnil

caserawKV:=<-n.plr.Output():

ifrawKV==nil{

continue

}

pEvent:=model.NewPolymorphicEvent(rawKV)

sorter.handleRawEvent(ctx,pEvent)

}

}

})

以上是经过简化的代码,能看到在方法中,有两个比较重要的参数和,它们分别从时间和空间这两个维度上叙述了我们想要拉取的统计数据范围。在 Puller 被创建出来之后,下面部分的代码分别启动了两个 goroutine,第两个负责运行 Puller 的内部逻辑,第二个则是等待 Puller 输出统计数据,然后把统计数据发给 Sorter。从中吐出来的统计数据长这个样子:

puller.New

startTs

tableSpan()

plr.Output()

//RawKVEntrynotifytheKVoperator

typeRawKVEntrystruct{

OpTypeOpType`msg:”op_type”`

Key[]byte`msg:”key”`

//nilfordeletetype

Value[]byte`msg:”value”`

//nilforinserttype

OldValue[]byte`msg:”old_value”`

StartTsuint64`msg:”start_ts”`

//CommitorresolvedTS

CRTsuint64`msg:”crts”`

}

所以,在上游 TiDB 写入的那两条 DML 语句,在到达 Puller 的时候会是这样这样的两个统计数据结构

TiCDC 源码阅读(一)TiCDC 架构概览
关上腾讯新闻报道 查阅TNUMBERBX

我们能看到 Insert 语句扫描出的统计数据只有 value 没有 old_value,而 Update 语句则被转化为一条既有 value 又有 old_value 的行更改统计数据。

这样这两条统计数据就成功的被 Puller 拉取到了 TiCDC,但是因为 TiDB 中一张表的统计数据会被分散到多个 Region 上,所以 Puller 会与多个 TiKV Region Leader 节点建立连接,然后拉取统计数据。那实际上 TiCDC 拉取到的更改统计数据可能是乱序的,我们需要对拉取到的所有统计数据进行排序才能正确的将事务按照顺序并行到上游。

Sorter 排序

TablePipeline 中的 Sorter 只是两个拥有 Sorter 名字的中转站,实际上负责对统计数据进行排序的是它背后的 Sorter Engine,Sorter Engine 的生命周期是和 Capture 完全一致的,两个 Capture 节点上的所有 Processor 会共享两个 Sorter Engine。想要如是说它是怎么工作的,能写作EventSorter 接口和其具体实现的相关代码。

在这里,我们只需要知道统计数据进入 TablePipeline 中的 Sorter 后会被排序即可。假设我们现在除了上述的两条统计数据以外,在该表上又进行了其他的写入操作,并且该操作的统计数据在另外两个 Region。最后 Puller 拉到的统计数据如下表所示:

TiCDC 源码阅读(一)TiCDC 架构概览
关上腾讯新闻报道 查阅TNUMBERBX

除了统计数据以外,我们还能看到的事件,这是两个在 TiCDC 控制系统中很重要的时间标志。当 TiCDC 收到时,能认为小于等于这个时间点提交的统计数据都已经被接收了,并且以后不会再有早于这个时间点的统计数据再发送下来,此时 TiCDC 能此为界限来将收到的统计数据并行至上游。

Resolved

Resolved

此外,我们能看到拉取到的统计数据并不是按照 commit_ts 严格排序的,Sorter 会根据 commit_ts 将它们进行排序,最后得到如下表所示的统计数据:

TiCDC 源码阅读(一)TiCDC 架构概览
关上腾讯新闻报道 查阅TNUMBERBX

现在排好顺序的事件就能往上游并行了,但是在这之前我们需要先对统计数据做一些转换,因为此时的统计数据是从 TiKV 中扫描出的 key-value,它们实际上只是一堆 bytes 统计数据,而不是上游想要消费的消息格式。

Mounter 解析

以上的 Event 统计数据从 Sorter 出来之后,Mounter 会根据其对应的表的 Schema 信息将它还原成按照表结构组织的统计数据。

typeRowChangedEventstruct{

StartTsuint64

CommitTsuint64

Table*TableName

ColInfos[]rowcodec.ColInfo

Columns[]*Column

PreColumns[]*Column

IndexColumns[][]int

}

能看到,该结构体中还原出了所有的表和列信息,并且 Columns 和 PreColumns 就对应于 value 和 old_value。当 TiCDC 拿到这些信息之后我们就能将统计数据继续下发至 Sink 组件,让其根据表信息和行更改统计数据去写上游统计资料库或者生产 Kafka 消息。值得注意的是,Mounter 进行的是一项 CPU 密集型工作,当两个表中所包含的字段较多时,Mounter 会消耗大量的计算资源。

Sink 下发统计数据

当被下发至 Sink 组件时,它身上已经包含了充分的信息,我们能将其转化为 SQL 或者特定消息格式的 Kafka 消息。在上文的构架图中我们能看到有三种 Sink,一种是接入在 Table Pipeline 中的 TableSink,另外一种是 Processor 等级共用的 ProcessorSink。它们在控制系统中有不同的作用:

RowChangedEvent

TableSink 作为一种 Table 等级的管理单位,缓存着要下发到 ProcessorSink 的统计数据,它的主要作用是方便 TiCDC 按照表为单位管理资源和进行调度ProcessorSink 作为真实要与统计资料库或者 Kafka 建立连接的 Sink 负责 SQL/Kafka 消息的转换和并行

我们再来看一看 ProcessorSink 到底如何转换这些行更改:

如果上游是统计资料库,ProcessorSink 会根据中的 Columns 和 PreColumns 来判断它到底是两个、还是操作,然后根据不同的操作类型,将其转化为 SQL 语句,然后再将其透过统计资料库连接写入上游:RowChangedEventInsertUpdateDelete

/*

因为只有Columns所以是Insert语句。

*/

INSERTINTOTEST(NAME,AGE)

VALUES(Jack,20);

/*

因为既有Columns且有PreColumns所以是Update语句。

*/

UPDATETEST

SETAGE=25

WHERENAME=Jack;如果上游是 Kafka, ProcessorSink 会作为两个Kafka Producer按照特定的消息格式将统计数据发送至 Kafka。 以Canal-JSON为例,我们上述的 Insert 语句最后会以如下表所示的 JSON 格式写入 Kafka:

{

“id”:0,

“database”:”test”,

“table”:”TEST”,

“pkNames”:[

“NAME”

],

“isDdl”:false,

“type”:”INSERT”,

“ts”:2,

sql

“:””,

“data”:[

{

“NAME”:”Jack”,

“AGE”:”25″

}

],

“old”:null

}

这样,上游 TiDB 执行的 DML 就成功的被发送到上游控制系统了。

结尾

以上就是本文的全部文本。希望在写作完上面的文本之后,听众能对 TiCDC 是什么?为什么?怎么实现?这几个问题有两个基本的答案。

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务