在本系列的第三篇文章中,我们讨论了使用 AWS DMS 将数据流复制到 Amazon Kinesis 数据流目标端点时需要关注的关键考虑因素。我们涵盖了如何优化复制性能的多种策略,包括使用 VPC 接口端点、选择合适的分区键、配置足够的分片数量、处理大对象 (LOB)、确保数据顺序和对 NULL 值的管理等。以下是一些关键要点:
利用 VPC 接口端点保持端到端通信的私密性确保选择合适的分区键以优化数据流根据需要配置足够数量的分片以满足高吞吐量避免向 Kinesis 插入超过 1MB 的负载消息管理无序数据和重复数据的问题在本系列的第一部分中,我们讨论了多线程全量加载和变更数据捕获CDC设置的高层架构,以调整相关参数,以实现更好的 AWS 数据库迁移服务 (AWS DMS) 复制性能。在第二部分中,我们提供了一些示例,展示了如何通过调整多线程设置获得不同的结果。在本篇文章中,我们将进一步探讨使用 Kinesis 数据流作为目标时的其他关键考虑事项。

要跟随本文,您应该熟悉以下 AWS 服务:
AWS DMSAmazon Kinesis 数据流Amazon RDS for PostgreSQL从 AWS DMS 版本 347 开始,AWS DMS 现在支持与 Amazon Kinesis、Amazon S3、Amazon DynamoDB 和 Amazon Redshift 等服务通过 Amazon VPC 端点进行通信。如以下图所示,AWS DMS 复制任务与 Kinesis 的目标之间的数据是私密的,这确保了持续的带宽和吞吐量,同时避免了通过公共互联网传输数据。
要为 Kinesis 设置 VPC 接口端点,请完成以下步骤:
在 Amazon VPC 控制台中,选择与您的 AWS DMS 复制实例相同的 AWS 区域。在导航窗格中选择 Endpoints。选择 Create endpoint。在 Service 中选择 kinesisstreams 服务。例如,comamazonawsuseast1kinesisstreams。在 VPC 中,选择与您的 AWS DMS 复制实例相同的 VPC 以创建端点。在 Subnets 中,选择您用于部署 AWS DMS 复制实例的相同子网。在 Security groups 中,选择用于部署 AWS DMS 复制实例的相同安全组。要指定访问控制,在 Policy 中选择 Full access,或者选择 Custom 使用策略创建工具指定自己的访问控制。在 Endpoints 中,确认您新创建的 VPC 端点状态为 Available。在 Kinesis 中,数据流由不同的分片组成,记录按分片顺序排列。AWS DMS 使用表映射规则将数据从源映射到目标 Kinesis 数据流。数据记录的 partitionkeytype 可能的值包括 schematable、transactionid、primarykey、constant 和 attributename。必须选择正确的映射规则,以确保数据被高效迁移,并避免目标延迟。
在本场景中,我们从名为 employee 的表迁移数据,并演示选择某一列对 AWS DMS 任务延迟的影响。请完成以下步骤:
在源 (PostgreSQL) 实例上使用以下表 DDL 创建 employee 表:sql create table employee (employeeid serial not null primary key employeename varchar(64) deptno varchar(3))
创建一个 Kinesis 流,并将容量模式设置为按需模式。
创建一个 AWS DMS 任务,使用选项 Migrate existing data and replicate ongoing changes 或 Replicate data changes only。
设置 AWS DMS 任务,使用以下表映射规则迁移 public 架构下的 employee 表。将 ParallelApplyThreads 设置为 32,将 ParallelApplyQueuesPerThread 设置为 512,将 partitionkeytype 设置为 attributename,将表中的 deptno 设置为将数据分布到分片。以下映射规则确保属于同一部门的员工数据将被分组到同一个分片:json { rules [ { ruletype selection ruleid 1 rulename 1 ruleaction include objectlocator { schemaname public tablename employee } } { ruletype objectmapping ruleid 2 rulename TransformToKinesis ruleaction maprecordtorecord targettablename CustomerData objectlocator { schemaname public tablename employee } mappingparameters { partitionkeytype attributename partitionkeyname rootkey attributemappings [ { targetattributename rootkey attributetype scalar attributesubtype string value {deptno} } ] } } ] }
在正在进行的复制阶段,请在 PostgreSQL 数据库中插入以下记录:sql insert into employee (employeename deptno) (select name generateseries(1 1000000)text 1)
在上述 DML 语句中,您插入了 100 万条记录到 employee 表,并将每位员工分配到部门编号 1。在此示例中,所有 100 万条记录都将移入同一个 Kinesis 分片,因为它们都属于同一部门 (1)。
现在您可以查看记录将如何根据映射规则进行迁移。为此,您可以使用 Amazon CloudWatch 指标。有关更多信息,请参阅 复制任务指标。
如以下图所示的 CDCIncomingChanges 指标所示,插入了 100 万条更改,并且处理这些更改大约花费了 3 小时,吞吐量为 92 条记录/秒。
记录正在累积在 AWS DMS 复制实例的底层存储上,等待应用到目标,如 CDCChangesDiskTarget 指标所示。
CDCLatencyTarget 指标显示目标延迟的增加,达到了 9893 秒。虽然您选择了按需模式的数据流,但目标延迟在增加,因为您插入的所有员工都具有相同的部门编号,因此数据被移动到同一分片。
现在您可以重复同样的过程,但输入以下映射规则,其中在属性映射中选择 employeeid 而不是 deptno:
json{ rules [ { ruletype selection ruleid 1 rulename 1 ruleaction include objectlocator { schemaname public tablename employee } } { ruletype objectmapping ruleid 2 rulename TransformToKinesis ruleaction maprecordtorecord targettablename CustomerData objectlocator { schemaname public tablename employee } mappingparameters { partitionkeytype attributename partitionkeyname rootkey attributemappings [ { targetattributename rootkey attributetype scalar attributesubtype string value {employeeid} } ] } } ]}
我们可以得出结论,属性从 deptno 更改为 employeeid 后,可以更优化地将数据分布到分片中。CDCLatencyTarget 指标显示,由于更改延迟从 9893 秒降低到 24 秒。一百万条记录处理的时间仅为 24 秒,吞吐量达到 416666 条记录/秒。
因此,理解源系统中 DML 语句的特性,并选择适当的 partitionkeytype 设置至关重要。
如果源上有多个表,每个表的主键范围有限,可以使用 endpoint 设置 PartitionIncludeSchemaTable,此设置会在分区值前添加架构和表名,从而增加 Kinesis 分片之间的数据分布。例如,假设一个 sysbench 架构下有成千上万的表,而每个表的主键范围有限。在这种情况下,相同的主键将从成千上万的表发送到同一个分片,这会导致限流。
在 Kinesis 中,最大吞吐量取决于为流配置的分片数量。每个分片支持最高 1MB/秒 或 1000 条记录/秒的写入吞吐量,或最高 2MB/秒 或 2000 条记录/秒的读取吞吐量。
在这个场景中,您将从先前创建的 employee 表迁移数据,并演示在 Kinesis 上未配置足够分片如何影响 AWS DMS 任务。请完成以下步骤:
创建一个 Kinesis 数据流,并选择按量能力模式。在本例中,您在目标上配置了 8 个分片,支持最高 8000 条记录/秒的写入吞吐量。创建一个 AWS DMS 任务,使用选项 Migrate existing data and replicate ongoing changes 或 Replicate data changes only。设置 AWS DMS 任务,使用以下表映射规则迁移 public 架构下的 employee 表。将 partitionkeytype 设置为 attributename,从表中选择 employeeid,以均匀分布数据到各个分片:json{ rules [ { ruletype selection ruleid 1 rulename 1 ruleaction include objectlocator { schemaname public tablename employee } } { ruletype objectmapping ruleid 2 rulename TransformToKinesis ruleaction maprecordtorecord targettablename CustomerData objectlocator { schemaname public tablename employee } mappingparameters { partitionkeytype attributename partitionkeyname rootkey attributemappings [ { targetattributename rootkey attributetype scalar attributesubtype string value {employeeid} } ] } } ]}
在正在进行的复制阶段,运行以下命令向员工表插入 1000 万条记录:sqlinsert into employee (employeename deptno) (select name generateseries(1 10000000)text 1)
由于您未在 Kinesis 上配置足够的分片,CDCChangesDiskTarget 指标显示行正在累积在磁盘上,并等待提交。
在 CloudWatch 日志 中,您可以找到如下片段,显示当交换空间超过 1GB 时,从源读取被暂停,从而减缓复制速度:
20230606T150047 [SORTER ]I Reading from source is paused Total storage used by swap files exceeded the limit 1048576000 bytes (sortertransactionc110)
加速器永久免费版下载安装我们可以得出结论,除了选择高效的 partitionkeytype 设置外,您还需为 Kinesis 提供足够的分片,以避免 AWS DMS 复制实例上的交换并确保数据的最佳加载。
为监控目的,使用 CloudWatch 指标 WriteProvisionedThroughputExceeded 监控由于吞吐量限制而被拒绝的记录数,该指标包括来自 AWS DMS 任务的 PutRecord 和 PutRecords 操作的限制。AWS DMS 自动重试 PutRecord 和 PutRecords,在 347 版本中超时为 5 分钟,在 351 版本中超时为 30 分钟。当由于 Kinesis 端容量不足导致频繁限流时,可能在重试超时后导致数据在迁移过程中丢失。根据需求在数据流中选择按需模式或按量模式,并确保 Kinesis 上有足够的分片。
在 Kinesis 中,记录的数据负载最大为 1 MB。在此场景中,您将从一个名为 sample 的表迁移数据,并观察迁移超过 1 MB 的大对象 (LOB) 时会发生什么。请完成以下步骤:
在源PostgreSQL实例上使用以下表 DDL 创建 sample 表:sql CREATE TABLE sample ( id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY val TEXT NOT NULL )
创建一个 Kinesis 流,并选择按量能力模式。在此示例中,您配置了 8 个分片,支持最大 8000 条记录/秒的写入吞吐量。创建一个 AWS DMS 任务,使用选项 Migrate existing data and replicate ongoing changes 或 Replicate data changes only。在定义 对象映射 时选择可用的一个 partitionkeytype 设置。在进行的复制阶段,运行以下命令向 sample 表中插入一些随机文本:sql INSERT INTO publicsample (val) SELECT repeat(md5(random()TEXT) 10000065 ceil(random() 25)INT) FROM generateseries(1 1) x
以下 CloudWatch 片段显示 AWS DMS 跳过了该记录,因为数据超过 1 MB 限制:
20231026T19