概述

BigQuery 是 Google Cloud 提供的企业级无服务器数据仓库解决方案。本文档基于实际调研,总结 BigQuery 在数据仓库场景下的应用方案,特别是如何高效处理变更数据捕获(CDC)场景。

核心特点:

  • 📊 列式存储,适合大规模数据分析
  • ⚡ 无服务器架构,自动扩展
  • 💰 按查询量计费
  • 🔄 支持实时数据流和批处理

背景知识

BigQuery 的特殊性

主键约束

BigQuery 可以定义主键(Primary Key)和外键(Foreign Key),但与传统关系型数据库不同:

  • 非强一致性:主键不会阻止重复数据插入
  • 无唯一键:不存在 UNIQUE 约束
  • 数据叠加:相同主键的数据会累加,而非覆盖

示例:

1
2
3
4
5
6
7
8
9
10
-- 定义主键(声明式,不强制)
CREATE TABLE `project.dataset.table` (
id INT64 NOT NULL,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
);

-- 即使有主键,以下操作依然会成功
INSERT INTO `project.dataset.table` VALUES (1, 'Alice');
INSERT INTO `project.dataset.table` VALUES (1, 'Bob'); -- 不会报错!

更新操作的限制

BigQuery 不建议频繁执行 UPDATEDELETE 操作:

性能问题:

  • 📈 数据更新开销大:列式存储需要重新组织数据
  • 💸 资源消耗高:即使小批量更新也可能触发大规模重写
  • ⏱️ 延迟增加:特别是大数据量场景

架构限制:

  • ✅ 更适合追加(Append-only)方式
  • ⚠️ 分区表更新会触发分区重算
  • 🔄 更新成本远高于传统 RDBMS

数据仓库分层架构

典型的数据仓库分层设计:

1
2
3
4
5
6
7
ODS (Operational Data Store) - 操作数据层

DWD (Data Warehouse Detail) - 明细数据层

DWS (Data Warehouse Summary) -汇总数据层

ADS (Application Data Service) - 应用数据层

方案对比

传统大数据方案

基于 Hadoop 生态的传统方案:

特点:

  • 🏗️ 需要搭建完整的大数据架构
  • 📚 学习成本高
  • 💰 运维成本高
  • ⚙️ 灵活性强,可定制性高

BigQuery 方案对比

方案适用场景优点缺点
方案一:Storage API + CDC高频更新,实时性要求高性能最优,官方推荐配置复杂,有延迟
方案二:定期 Merge中等频率更新平衡性能和成本存在数据延迟
方案三:直接更新低频更新简单直接大量更新性能差

方案一:BigQuery Storage API + CDC

方案概述

官方推荐的 CDC 解决方案,使用 BigQuery Storage Write API 实现高性能数据变更捕获。

核心组件:

  • 📥 BigQuery Storage Write API(数据写入)
  • 🔄 变更数据捕获机制(CDC)
  • 📊 分区表(查询优化)
  • 💾 物化视图(成本优化)

前提条件

启用必要的 API:

  1. 在 Google Cloud Console 进入 API & Services
  2. 搜索并启用 BigQuery Storage API
  3. 配置相应的服务账号权限

权限要求:

1
2
3
4
# 所需角色
- BigQuery Data Editor
- BigQuery Job User
- Storage Admin(如果涉及 GCS)

Storage API 优势

与传统 REST API 对比:

指标BigQuery Storage APIREST API
首次处理耗时4 秒9 秒
完整处理耗时2 分 32 秒1 小时 43 分 46 秒
费用$5$4.1

来源: Cloud Ace 性能测试

实现步骤

步骤一:创建 CDC 表

1
2
3
4
5
6
7
8
9
10
11
12
13
-- 创建带主键的 CDC 表
CREATE TABLE `project-id.dataset.cdc_table` (
id INT64 NOT NULL,
name STRING,
email STRING,
updated_at TIMESTAMP,
_CHANGE_TYPE STRING -- CDC 类型标记
)
PARTITION BY DATE(updated_at)
OPTIONS(
require_partition_filter = true
)
PRIMARY KEY (id) NOT ENFORCED;

步骤二:使用 Storage Write API

Java 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import com.google.cloud.bigquery.storage.v1.*;
import com.google.protobuf.Descriptors.Descriptor;

public class StorageWriteExample {

public void writeDataWithCDC(String projectId, String datasetId, String tableId)
throws Exception {

try (BigQueryWriteClient client = BigQueryWriteClient.create()) {
// 构建表路径
TableName tableName = TableName.of(projectId, datasetId, tableId);

// 创建写入流
WriteStream stream = WriteStream.newBuilder()
.setType(WriteStream.Type.COMMITTED)
.build();

CreateWriteStreamRequest createRequest = CreateWriteStreamRequest.newBuilder()
.setParent(tableName.toString())
.setWriteStream(stream)
.build();

WriteStream writeStream = client.createWriteStream(createRequest);

// 写入数据
try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(
writeStream.getName(), writeStream.getTableSchema())
.build()) {

JSONArray jsonData = new JSONArray();
JSONObject record = new JSONObject();
record.put("id", 1);
record.put("name", "Alice");
record.put("email", "alice@example.com");
record.put("updated_at", System.currentTimeMillis());
record.put("_CHANGE_TYPE", "UPSERT");
jsonData.put(record);

ApiFuture<AppendRowsResponse> future = writer.append(jsonData);
AppendRowsResponse response = future.get();

System.out.println("Rows written: " + response.getAppendResult().getOffset());
}
}
}
}

Python 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
import json

def write_data_with_cdc(project_id, dataset_id, table_id):
"""使用 Storage Write API 写入 CDC 数据"""

client = bigquery_storage_v1.BigQueryWriteClient()

# 构建表路径
parent = f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"

# 创建写入流
write_stream = types.WriteStream()
write_stream.type_ = types.WriteStream.Type.COMMITTED

request = types.CreateWriteStreamRequest(
parent=parent,
write_stream=write_stream
)

stream = client.create_write_stream(request=request)

# 准备数据
rows = [
{
"id": 1,
"name": "Alice",
"email": "alice@example.com",
"updated_at": "2025-04-29T10:00:00",
"_CHANGE_TYPE": "UPSERT"
}
]

# 写入数据
serialized_rows = [json.dumps(row).encode('utf-8') for row in rows]

request = types.AppendRowsRequest(
write_stream=stream.name,
rows=types.AppendRowsRequest.ProtoData(
rows=types.ProtoRows(
serialized_rows=serialized_rows
)
)
)

response = client.append_rows(iter([request]))

print(f"Rows written: {response.result().offset}")

步骤三:配置分区表

时间分区(推荐用于日志数据):

1
2
3
4
5
6
7
8
9
10
CREATE TABLE `project-id.dataset.logs_table` (
log_id INT64,
message STRING,
event_time TIMESTAMP
)
PARTITION BY DATE(event_time)
OPTIONS(
partition_expiration_days = 90, -- 分区自动过期
require_partition_filter = true -- 强制分区过滤
);

范围分区(推荐用于 ID 分组):

1
2
3
4
5
6
7
8
9
CREATE TABLE `project-id.dataset.user_table` (
user_id INT64,
name STRING,
region STRING
)
PARTITION BY RANGE_BUCKET(user_id, GENERATE_ARRAY(0, 1000000, 10000))
OPTIONS(
require_partition_filter = true
);

分区类型对比:

特性TimePartitioningRangePartitioning
字段类型DATETIMESTAMPINT64
分区依据时间范围数值范围
典型场景日志、时间序列数据用户分组、订单编号
查询优化按时间范围查询按数值范围查询

步骤四:创建物化视图

物化视图用于优化查询性能和降低成本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- 创建物化视图
CREATE MATERIALIZED VIEW `project-id.dataset.mv_daily_summary`
PARTITION BY event_date
AS
SELECT
DATE(event_time) AS event_date,
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount
FROM `project-id.dataset.events`
GROUP BY event_date, user_id;

-- 查询物化视图(自动使用预计算结果)
SELECT *
FROM `project-id.dataset.mv_daily_summary`
WHERE event_date = '2025-04-29';

物化视图优势:

  • ⚡ 预计算结果,查询更快
  • 💰 减少扫描量,降低成本
  • 🔄 自动增量更新

限制与注意事项

CDC 表限制:

  • ❌ 无法支持 Search Index
  • ⏱️ 需结合 max_staleness 参数使用
  • 📊 存在查询延迟(通常几秒到几分钟)

max_staleness 使用:

1
2
3
4
5
6
7
-- 允许最多 15 分钟的数据延迟
SELECT *
FROM `project-id.dataset.cdc_table`
WHERE updated_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
OPTIONS(
max_staleness = INTERVAL 15 MINUTE
);

方案二:定期 Merge

方案概述

采用分层架构,定期执行 MERGE 操作同步数据。

架构分层:

  • 实时层(Staging):存储最新的 CDC 记录
  • 批处理层(Base):存储历史稳定数据

实现步骤

步骤一:创建实时表和基础表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-- 实时表(临时存储 CDC 数据)
CREATE TABLE `project-id.dataset.staging_item` (
id INT64,
category_id INT64,
category_log STRING,
create_date TIMESTAMP,
operation_type STRING, -- INSERT/UPDATE/DELETE
sync_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
)
PARTITION BY DATE(sync_time);

-- 基础表(最终数据)
CREATE TABLE `project-id.dataset.item` (
id INT64,
category_id INT64,
category_log STRING,
create_date TIMESTAMP,
updated_at TIMESTAMP
)
PARTITION BY DATE(create_date);

步骤二:实时插入 CDC 记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
-- 插入更新操作
INSERT INTO `project-id.dataset.staging_item` (
id,
category_id,
category_log,
create_date,
operation_type
)
VALUES (
12345,
67890,
'Category updated',
CURRENT_TIMESTAMP(),
'UPDATE'
);

步骤三:定期执行 MERGE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-- MERGE 操作(处理 INSERT/UPDATE/DELETE)
MERGE `project-id.dataset.item` AS target
USING `project-id.dataset.staging_item` AS source
ON target.id = source.id
-- 处理 UPDATE
WHEN MATCHED AND source.operation_type = 'UPDATE' THEN
UPDATE SET
target.category_id = source.category_id,
target.category_log = source.category_log,
target.updated_at = source.create_date
-- 处理 DELETE
WHEN MATCHED AND source.operation_type = 'DELETE' THEN
DELETE
-- 处理 INSERT
WHEN NOT MATCHED AND source.operation_type = 'INSERT' THEN
INSERT (id, category_id, category_log, create_date, updated_at)
VALUES (source.id, source.category_id, source.category_log, source.create_date, source.sync_time);

步骤四:清理已处理数据

1
2
3
-- 删除已合并的数据
DELETE FROM `project-id.dataset.staging_item`
WHERE sync_time < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR);

调度策略

使用 Cloud Scheduler + Cloud Functions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from google.cloud import bigquery
import functions_framework

@functions_framework.http
def merge_cdc_data(request):
"""定期执行 MERGE 操作的云函数"""

client = bigquery.Client()

merge_query = """
MERGE `project-id.dataset.item` AS target
USING `project-id.dataset.staging_item` AS source
ON target.id = source.id
WHEN MATCHED AND source.operation_type = 'UPDATE' THEN
UPDATE SET
target.category_id = source.category_id,
target.updated_at = source.create_date
WHEN MATCHED AND source.operation_type = 'DELETE' THEN
DELETE
WHEN NOT MATCHED AND source.operation_type = 'INSERT' THEN
INSERT (id, category_id, create_date, updated_at)
VALUES (source.id, source.category_id, source.create_date, source.sync_time)
"""

job = client.query(merge_query)
job.result() # 等待完成

return f"Merged {job.total_bytes_processed} bytes"

调度频率建议:

  • 实时性要求高:每 5-15 分钟
  • 一般场景:每小时
  • 低频场景:每天

方案三:直接更新

适用场景

  • 📝 更新频率低(每天数百次以内)
  • 📊 数据量小(百万级以下)
  • ⏰ 对实时性要求不高

基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
-- 更新单条记录
UPDATE `project-id.dataset.item`
SET
category_id = 234,
category_log = 'Updated category',
updated_at = CURRENT_TIMESTAMP()
WHERE id = 1;

-- 批量更新
UPDATE `project-id.dataset.item`
SET
status = 'inactive',
updated_at = CURRENT_TIMESTAMP()
WHERE last_login < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 90 DAY);

-- 删除操作
DELETE FROM `project-id.dataset.item`
WHERE status = 'deleted'
AND updated_at < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY);

优化建议

使用分区过滤:

1
2
3
4
5
6
7
8
9
10
-- ✅ 好的查询(使用分区过滤)
UPDATE `project-id.dataset.item`
SET status = 'processed'
WHERE DATE(create_date) = '2025-04-29'
AND id IN (1, 2, 3);

-- ❌ 差的查询(全表扫描)
UPDATE `project-id.dataset.item`
SET status = 'processed'
WHERE id IN (1, 2, 3);

成本优化策略

查询成本控制

使用分区和聚簇:

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE `project-id.dataset.optimized_table` (
id INT64,
user_id INT64,
event_name STRING,
event_time TIMESTAMP,
amount NUMERIC
)
PARTITION BY DATE(event_time)
CLUSTER BY user_id, event_name
OPTIONS(
require_partition_filter = true
);

好处:

  • 📉 减少扫描的数据量
  • 💰 降低查询成本(按扫描量计费)
  • ⚡ 提升查询性能

使用物化视图

1
2
3
4
5
6
7
8
9
10
-- 创建物化视图降低重复查询成本
CREATE MATERIALIZED VIEW `project-id.dataset.mv_user_stats`
AS
SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount,
MAX(event_time) AS last_event_time
FROM `project-id.dataset.events`
GROUP BY user_id;

监控和配额

设置项目配额:

  1. 进入 BigQuery Console
  2. 点击项目设置
  3. 设置每日查询配额限制

查询成本估算:

1
2
3
4
5
6
-- 使用 DRY RUN 估算查询成本
#standardSQL
--dry_run
SELECT *
FROM `project-id.dataset.large_table`
WHERE DATE(timestamp) >= '2025-01-01';

最佳实践总结

选择合适的方案

场景推荐方案原因
高频实时更新方案一:Storage API + CDC性能最优
中频批量更新方案二:定期 Merge平衡性能和成本
低频小批量更新方案三:直接更新简单直接

通用建议

数据建模:

  • ✅ 优先使用追加模式
  • ✅ 合理使用分区表
  • ✅ 对高基数列使用聚簇
  • ❌ 避免频繁 UPDATE/DELETE

查询优化:

  • ✅ 始终使用分区过滤
  • ✅ 选择必要的列,避免 SELECT *
  • ✅ 使用物化视图缓存常用查询
  • ❌ 避免跨分区的大范围查询

成本控制:

  • 💰 设置查询配额
  • 📊 监控查询成本
  • 🔄 定期清理旧数据
  • 📅 使用分区过期策略

参考资源

官方文档

集成与工具

实践案例