“使用谷歌的技术在 GCP 上构建流管道非常简单”
目录
第 1 步:构建客户端
第 2 步:数据注入
第 3 步:数据处理
第 4 步:数据落盘
第 1 步:构建客户端
Pubsub 的客户端可以是任何数据生产者,例如移动应用程序事件、用户行为埋点、数据库更改(CDC)、传感器数据等。
Cloud PubSub 提供许多客户端库,例如 Java、C#、C++、Go、Python、Ruby、PHP 和 REST API。
可以轻松地将客户端代码集成到数据生产者中,以将数据发布到 Cloud PubSub
这是Java客户端代码的示例
该客户端代码使用 forever while 循环模拟流事件。这些事件随后被转换为字节串。
使用我们需要发布的事件创建和初始化 PubSub 消息。
最后将事件发布到指定的 PubSub 主题。
发布事件后,我们输出pubsub分配和返回的已发布消息ID。
简单的 PubSub 客户端代码
现在转到您的 pubsub 客户端根项目并执行 maven 以运行客户端。
/* export service account prior to run below command to provide access to various services export GOOGLE_APPLICATIONS_CREDENTIALS=~/service_account.json*/ |
runnig maven 执行命令后,您可以查看有关已发布 message_id 的以下日志
在控制台上发布的消息 ID
现在让我们在第2步中检查消息是否确实发布到云 pubsub
第 2 步:数据摄取
通过单击云控制台左侧的菜单导航到 Cloud Pubsub
转到 Cloud Pubsub 并单击查看消息,如下所示
PubSub 控制台
弹出弹窗供您选择订阅,如果您已有订阅,请选择订阅或为该主题创建一个。
点击拉取消息,您应该可以看到发布在 Cloud Pubsub 主题中的消息
在 Cloud PubSub 中查看消息
现在我们验证了数据确实发布在 Cloud Pubsub 中。让我们在第3步中使用 dataflow 处理读取和处理记录。
第 3 步:数据处理
Cloud Dataflow 是统一的分布式处理引擎,它使用 apache beam sdk 来构建数据管道(批处理和实时)。
在本示例中,我们将构建实时管道。
目标是从 PubSub 读取发布的消息,验证记录并过滤“Hello Streaming”和 Interger ex。 “1”,最后写入 bigquery 中的目标表。
管道代码结构
数据处理逻辑
现在我们可以使用 maven 命令执行上述数据流管道
/* export service account prior to run below command to provide access to various services export GOOGLE_APPLICATIONS_CREDENTIALS=~/service_account.json*/$mvn -Pdataflow-runner compile exec:java -Dexec.mainClass=org.apache.beam.examples.PubsubToBQ -Dexec.args="--project=$PROJECT_ID --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_NAME --stagingLocation=$CLOUD_STPRAGE_BUCKET --tempLocation=$CLOUD_STPRAGE_BUCKET/temp --runner=DataflowRunner --region=us-central1" |
执行此命令后,让我们检查 Dataflow 控制台以查看我们的管道
现在让我们从我们在第1步中构建的 pubsub 客户端生成一些消息
所有成功的消息都应该能够通过数据流流作业处理。 如果您单击 DAG 中的“Reading From PubSub”步骤,您可以看到它到目前为止已读取 218 个元素。
现在让我们进入下一步以验证数据流是否成功处理数据并将其写入 BigQuery。
第 4 步:数据落盘
通过单击 Google Cloud 控制台中的菜单并单击左侧菜单栏中的 BigQuery 转到 BigQuery。
如果我们导航到我们的目标表 hello_string ,我们将看到确实有 218 个元素流式传输到 BigQuery 的字符串缓冲区,这验证了发布到 pubsub 的所有元素都已成功写入 BigQuery。
现在让我们深入了解表格以查看数据。
数据已成功写入 BigQuery。 您还可以通过运行以下查询来验证最大计数为 217(因为消息计数为 0 索引)
SELECT max(Count) FROM `$PROJECT_ID.$TABLE_NAME.hello_string`; |
结论
在 GCP 中编写实时流式传输管道并不困难。 由于我们在演示中使用的所有服务都是无服务器的,因此无需安装即可轻松开始构建 POC。
文章信息
相关推荐
