在这篇文章中,我们将探讨如何使用 AWS IoT SiteWise 的 BulkImportJob API 将历史设备数据导入到 AWS IoT SiteWise。这一过程能够帮助企业确保数据连续性,训练 AI 和 ML 模型,并获得可行动的洞察。读者需要准备好 AWS 账户、所需权限以及一定的 Python 使用经验,以便顺利完成操作。
AWS IoT SiteWise 是一项托管服务,帮助客户以大规模收集、存储、组织和监控来自工业设备的数据。客户常常需要将现有系统如数据历史记录器和时间序列数据库中的历史设备测量数据导入到 AWS IoT SiteWise,以确保数据连续性、训练能够预测设备故障的 AI 和 ML 模型,并获得可行见解。
在本文中,我们将展示如何开始使用 BulkImportJob API,并使用 代码示例 将历史设备数据导入到 AWS IoT SiteWise。
您可以使用导入的数据通过 AWS IoT SiteWise Monitor 和 Amazon Managed Grafana 获取洞察,或者在 Amazon Lookout for Equipment 和 Amazon SageMaker 上训练 ML 模型,从而推动分析应用程序的发展。
要开始批量导入,客户需要将包含历史数据的 CSV 文件上传到 Amazon Simple Storage Service (Amazon S3) 中,文件需要按照预定义格式组织。在上传 CSV 文件后,客户可以使用 CreateBulkImportJob 操作异步启动导入 AWS IoT SiteWise,并通过 DescribeBulkImportJob 和 ListBulkImportJob 操作监控进度。

要跟随本文,您需要一个 AWS 账户 和 AWS IoT SiteWise 支持的区域。如果您已经在使用 AWS IoT SiteWise,请选择一个不同的区域来进行隔离环境的创建。同时,您还需要对 Python 有一定的熟悉度。
bash sudo yum install git git clone https//githubcom/awssamples/awsiotsitewisebulkimportexamplegit cd awsiotsitewisebulkimportexample pip3 install r requirementstxt
在这篇博客中,我们将使用 AWS Cloud9 实例来代表一个本地开发工作站,并模拟两个月的历史数据,针对某家汽车制造设施的几个生产线。
接下来,我们将准备数据并将其大规模导入 AWS IoT SiteWise,利用多个批量导入作业。最后,我们将验证数据是否成功导入。
批量导入作业可以将数据导入到 AWS IoT SiteWise 提供的两个存储层级,具体取决于存储的配置。在我们继续之前,首先要定义这两个存储层级。
存储层级描述热存储层存储频繁访问的数据,具有较低的写入到读取延迟,适合于操作仪表盘、报警管理系统及其他需要快速访问设备最近测量值的应用。冷存储层存储不太频繁访问的数据,具有较高的读取延迟,适合于需要访问历史数据的应用,比如商业智能 (BI) 仪表盘、人工智能 (AI) 和机器学习 (ML) 培训。为存储冷存储层的数据,AWS IoT SiteWise 使用客户账户中的 S3 桶。保留期限:决定您的数据在热存储层中存储多长时间后被删除。
在接下来的部分中,我们将按照步骤逐步导入历史设备数据到 AWS IoT SiteWise。
为了演示的目的,我们将为一个虚构的汽车制造商创建一个样本资产层级,该制造商在四个不同城市运营。在实际场景中,您或许已经在 AWS IoT SiteWise 中拥有一个现有资产层级,这样这一步骤就是可选的。
11 查看配置从终端转到 Git 仓库跟目录。查看资产模型和资产的配置。bash cat config/assetsmodelsyml
查看资产属性的架构。bash cat schema/samplestampingpresspropertiesjson
12 创建资产模型和资产运行 python3 src/createassethierarchypy 自动创建资产模型、层级定义、资产及资产关联。在 AWS 控制台 导航至 AWS IoT SiteWise,验证新创建的 模型 和 资产。验证您是否看到类似如下的资产层次结构:在此步骤中,我们将模拟两个月的历史数据,针对四台冲压机,跨越两条生产线。在实际场景中,这些数据通常来自于源系统,如数据历史记录器和时间序列数据库。
CreateBulkImportJob API 具备以下关键需求:
要识别资产属性,您需要指定 ASSETID PROPERTYID 组合或者 ALIAS。在本博客中,我们将使用前者。数据需以 CSV 格式提供。以下是生成数据的步骤,满足这些需求。有关架构的更多详细信息,请参考 使用 CreateBulkImportJob API 导入数据。
查看数据模拟配置。bash cat config/datasimulationyml
运行 python3 src/simulatehistoricaldatapy 为所选属性和时间段生成模拟历史数据。如果总行数超过 rowsperjob在 bulkimportyml 中配置,将会创建多个数据文件以支持并行处理。在此示例中,为四台冲压机 (AD) 生成了约 700000 数据点,覆盖两条生产线 (SampleLine 1 和 SampleLine 2)。因为我们将 rowsperjob 配置为 20000,所以将会创建总计 36 个数据文件。
验证在 data 目录下生成的数据文件。 数据架构将遵循 bulkimportyml 配置文件中配置的 columnnames。csv79817017bb134611b4b28094913cd287c487c0d7a9f24fe7b4bc92bf6f4f697bDOUBLE16672752000GOOD787679817017bb134611b4b28094913cd287c487c0d7a9f24fe7b4bc92bf6f4f697bDOUBLE16672752600GOOD673379817017bb134611b4b28094913cd287c487c0d7a9f24fe7b4bc92bf6f4f697bDOUBLE16672753200GOOD821379817017bb134611b4b28094913cd287c487c0d7a9f24fe7b4bc92bf6f4f697bDOUBLE16672753800GOOD727279817017bb134611b4b28094913cd287c487c0d7a9f24fe7b4bc92bf6f4f697bDOUBLE16672754400GOOD6145
22 将历史数据上传至 Amazon S3由于 AWS IoT SiteWise 需要将历史数据上传至 Amazon S3,因此我们将模拟数据上传至所选的 S3 桶。
在 bulkimportyml 中更新数据桶,使用任何现存可稍后删除的临时 S3 桶。运行 python3 src/uploadtos3py 将模拟的历史数据上传至配置的 S3 桶。导航至 Amazon S3 验证对象是否已成功上传。在您可以导入历史数据之前,AWS IoT SiteWise 需要您启用冷存储。有关详细信息,请参阅 配置存储设置。
如果您已经激活了冷存储,请考虑将 S3 桶修改为一个可以稍后删除的临时桶。在更改 S3 桶时,请确保与 S3 访问角色下配置的 IAM 角色拥有权限访问新 S3 桶。
加速器永久免费版下载安装31 配置存储设置导航至 AWS IoT SiteWise,选择 存储,然后选择 激活冷存储。选择您所需的 S3 桶位置。 选择 从 AWS 管理模板创建角色。勾选 激活保留期限,输入 30 天,并保存。 32 赋予 AWS IoT SiteWise 读取 Amazon S3 数据的权限导航至 AWS IAM,选择 策略 在 访问管理 下,点击 创建策略。切换到 JSON 选项卡,将内容替换为以下内容。将 lt bucketnamegt 更新为在 bulkimportyml 中配置的数据 S3 桶名称。json{ Version 20121017 Statement [ { Effect Allow Action [ s3 ] Resource [arnawss3ltbucketnamegt] } ]}
保存策略,命名为 SiteWiseBulkImportPolicy。选择 角色 在 访问管理 下,点击 创建角色。选择 自定义信任策略 将内容替换为以下内容。json{ Version 20121017 Statement [ { Sid Effect Allow Principal { Service iotsitewiseamazonawscom } Action stsAssumeRole } ]}
点击 下一步,选择在前面步骤中创建的 SiteWiseBulkImportPolicy IAM 策略。点击 下一步 以创建角色,角色名称 输入为 SiteWiseBulkImportRole。选择 角色 在 访问管理 下,搜索新创建的 IAM 角色 SiteWiseBulkImportRole,点击其名称。复制 IAM 角色的 ARN。33 创建 AWS IoT SiteWise 批量导入作业在 config/bulkimportyml 中将 rolearn 字段替换为在前面步骤中复制的 SiteWiseBulkImportRole IAM 角色的 ARN。更新 config/bulkimportyml 文件:将 rolearn 替换为 ARN 的 SiteWiseBulkImportRole IAM 角色。将 errorbucket 替换为可以稍后删除的任何现存的临时 S3 桶。运行 python3 src/createbulkimportjobpy 将历史数据从 S3 桶中导入 AWS IoT SiteWise:脚本将创建多个作业以并行导入所有创建的数据文件到 AWS IoT SiteWise。在现实世界中,可以快速将数 TB 的数据导入 AWS IoT SiteWise。从输出中检查作业状态:bash Total S3 objects 36 Number of bulk import jobs to create 36 Created job 03e75fb21275487fa0115ae6717e0c2e for importing data from data/historicaldata1csv S3 object Created job 7938c0d2f177497989592536b46f91b3 for importing data from data/historicaldata10csv S3 object Checking job status every 5 secs until completion Job id 03e75fb21275487fa0115ae6717e0c2e status COMPLETED Job id 7938c0d2f177497989592536b46f91b3 status COMPLETED
如果您看到任何作业的状态为 COMPLETEDWITHFAILURES 或 FAILED,请参考常见问题排查部分。一旦批量导入作业完成,我们需要验证历史数据是否成功导入到 AWS IoT SiteWise。您可以通过直接查看冷存储或通过 AWS IoT SiteWise Monitor 可视化图表进行验证。
41 使用冷存储在这个步骤中,我们将检查是否在配置的冷存储的桶中创建了新的 S3 对象。
导航至 Amazon S3,找到在 AWS IoT SiteWise 下配置的 S3 桶 存储 S3 桶位置在步骤3。验证在 raw/ 前缀下的分区和对象。 42 使用 AWS IoT SiteWise Monitor在此步骤中,我们将可视化检查图表是否显示导入日期范围内的数据。
导航至 AWS IoT SiteWise 并找到 Monitor。创建一个门户以访问存储在 AWS IoT SiteWise 中的数据。将 AnyCompany Motor 作为 门户名称。选择 IAM 作为 用户认证。输入您的电子邮件地址作为 支持联系邮件,然后点击