Redis-Stack 使用

Redis-Stack 使用

1. Redis-Stack 是什么

Redis-Stack 是一个包含了各种 Redis 模块的综合性发行版,旨在简化开发者的使用和部署。它不仅仅是经典的 Redis 数据库,还集成了 Redis 最受欢迎的扩展功能,例如:

  • RedisJSON:支持存储、更新和检索 JSON 文档,让 Redis 不仅仅是键值存储,还能高效处理半结构化数据。
  • RedisSearch:提供强大的全文搜索、聚合和二级索引功能,使得在 Redis 中实现复杂查询变得轻而易举。
  • RedisGraph:一个图数据库模块,用于存储和查询图数据,支持 Cypher 查询语言,非常适合处理社交网络、推荐系统等场景。
  • RedisTimeSeries:用于存储和查询时间序列数据,支持聚合查询和下采样,适用于监控、IoT 等领域。
  • RedisBloom:提供了布隆过滤器 (Bloom Filter) 和 Cuckoo 过滤器等概率数据结构,用于高效地检查元素是否存在,常用于去重和缓存判断。

Redis-Stack 的出现极大地简化了开发者在 Redis 上构建复杂应用的过程,无需单独安装和配置这些模块,开箱即用,提升了开发效率和部署便利性。

2. RedisJSON 使用

RedisJSON 模块让 Redis 能够像文档数据库一样处理 JSON 数据。它提供了丰富的命令来操作 JSON 文档的各个部分,支持嵌套结构和路径查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import redis

# 连接到 Redis
r = redis.Redis()

# 示例数据
data = {
'dog': {
'scientific-name': 'Canis familiaris',
'breed': 'Labrador',
'age': 3,
'toys': ['ball', 'frisbee']
},
'cat': {
'scientific-name': 'Felis catus',
'breed': 'Siamese',
'age': 2
}
}

# 使用 json().set() 存储 JSON 对象
# 'doc' 是键名,'<span class="math-inline">' 表示根路径,data 是要存储的 Python 字典
r\.json\(\)\.set\('doc', '</span>', data)

print("--- 存储后的数据 ---")
# 使用 json().get() 获取整个 JSON 文档
doc = r.json().get('doc', '<span class="math-inline">'\)
print\(f"整个文档\: \{doc\}"\)
\# 获取 JSON 文档中特定路径的数据
dog \= r\.json\(\)\.get\('doc', '</span>.dog')
print(f"狗狗信息: {dog}")

# 获取嵌套路径的数据,例如狗狗的科学名称
scientific_name_dog = r.json().get('doc', '$.dog.scientific-name')
print(f"狗狗的科学名称: {scientific_name_dog}")

# 使用 JSONPath 的深层查询 (递归下降操作符 <span class="math-inline">\.\.\) 来获取所有科学名称
all\_scientific\_names \= r\.json\(\)\.get\('doc', '</span>..scientific-name')
print(f"所有科学名称: {all_scientific_names}")

# 更新 JSON 文档中的某个字段
r.json().set('doc', '<span class="math-inline">\.dog\.age', 4\)
updated\_dog\_age \= r\.json\(\)\.get\('doc', '</span>.dog.age')
print(f"更新后的狗狗年龄: {updated_dog_age}")

# 向数组中添加元素
r.json().arrappend('doc', '<span class="math-inline">\.dog\.toys', 'rope'\)
dog\_toys\_after\_append \= r\.json\(\)\.get\('doc', '</span>.dog.toys')
print(f"添加玩具后的狗狗玩具列表: {dog_toys_after_append}")

# 删除 JSON 文档中的某个字段
r.json().delete('doc', '<span class="math-inline">\.cat\.breed'\)
cat\_after\_delete\_breed \= r\.json\(\)\.get\('doc', '</span>.cat')
print(f"删除猫咪品种后的信息: {cat_after_delete_breed}")

3. RedisBloom 使用

RedisBloom 模块提供了一系列概率数据结构,其中最常用的是布隆过滤器 (Bloom Filter) 和 Cuckoo 过滤器。它们主要用于高效地判断一个元素是否存在于一个集合中,通常用于去重、缓存击穿防护和黑名单过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import redis

# 连接到 Redis
r = redis.Redis()

# 创建一个布隆过滤器
# r.bf().create("bloom", 0.01, 1000)
# 参数解释:
# "mybloomfilter": 布隆过滤器的键名
# 0.001: 错误率 (false positive probability),表示误判的概率,越小精度越高,但占用空间越大
# 5000: 预期要添加的元素数量,这个值会影响布隆过滤器的大小和效率
r.bf().create("mybloomfilter", 0.001, 5000)

# 添加元素到布隆过滤器
r.bf().add("mybloomfilter", "foo")
r.bf().add("mybloomfilter", "bar")
r.bf().add("mybloomfilter", "baz")

print("--- 布隆过滤器操作 ---")

# 检查元素是否存在
if r.bf().exists('mybloomfilter', "foo"):
print("foo 存在于布隆过滤器中!")
else:
print("foo 不存在于布隆过滤器中。")

if r.bf().exists('mybloomfilter', "qux"):
print("qux 存在于布隆过滤器中!")
else:
print("qux 不存在于布隆过滤器中。") # 预期输出:qux 不存在

# 批量添加元素
r.bf().madd("mybloomfilter", "apple", "orange", "grape")

# 批量检查元素
results = r.bf().mexists("mybloomfilter", "apple", "banana", "grape")
print(f"批量检查结果 (apple, banana, grape): {results}") # [1, 0, 1] (1表示可能存在,0表示一定不存在)

RedisBloom 使用说明:

  • 布隆过滤器特性:
    • 高效性: 空间效率高,查询速度快。
    • 概率性: 存在误判(false positive)的可能,即判断某个元素存在,但它实际上不存在。但它绝不会误报不存在(false negative),即如果布隆过滤器说一个元素不存在,那么它就一定不存在。
    • 不可删除性: 一般情况下,布隆过滤器中的元素不能被单独删除,因为删除会影响其他元素的判断。

RedisBloom 在需要高效去重、避免重复处理或快速判断是否存在大量数据时非常有用,比如:

  • 防止缓存击穿: 在查询数据库之前,先通过布隆过滤器判断数据是否存在,如果不存在则直接返回,避免大量请求直接打到数据库。
  • 爬虫URL去重: 避免重复爬取相同的 URL。
  • 垃圾邮件过滤: 识别已知的垃圾邮件地址。
  • 推荐系统: 过滤掉用户已经看过的内容。

4. RedisSearch 使用

RedisSearch 模块提供高性能的全文搜索、二级索引、聚合和数据过滤功能。它允许你在 Redis 中构建复杂的查询,而无需将数据迁移到其他专门的搜索数据库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import redis
from redis.commands.search.field import TagField, TextField, NumericField
from redis.commands.search.query import Query

# 连接到 Redis
r = redis.Redis()

# 定义索引 schema
# 定义字段,TagField 适用于精确匹配和过滤,TextField 适用于全文搜索,NumericField 适用于数值范围查询
schema = (
TagField("brand", as_name="brand"),
TextField("model", as_name="model", sortable=True), # sortable=True 允许按此字段排序
NumericField("year", as_name="year"),
NumericField("price", as_name="price", sortable=True)
)

# 创建索引
# "idx:cars" 是索引名称
# prefix=["car:"] 表示只有键名前缀为 "car:" 的数据才会被索引
r.ft("idx:cars").create_index(schema, prefix=["car:"])

# 添加一些车辆数据 (使用 HSET 来存储数据,RedisSearch 会从哈希中读取字段)
r.hset("car:1", mapping={"brand": "Toyota", "model": "Camry", "year": 2020, "price": 25000})
r.hset("car:2", mapping={"brand": "Honda", "model": "Civic", "year": 2022, "price": 22000})
r.hset("car:3", mapping={"brand": "Toyota", "model": "Corolla", "year": 2021, "price": 23000})
r.hset("car:4", mapping={"brand": "BMW", "model": "X5", "year": 2023, "price": 60000})
r.hset("car:5", mapping={"brand": "Mercedes", "model": "C-Class", "year": 2022, "price": 55000})
r.hset("car:6", mapping={"brand": "Honda", "model": "CRV", "year": 2023, "price": 30000})

print("--- RedisSearch 查询 ---")

# 全文搜索:查找包含 "Honda" 或 "Toyota" 的汽车
query = Query("Honda | Toyota")
result = r.ft("idx:cars").search(query)
print(f"搜索 'Honda | Toyota' 的结果 ({result.total} 条):")
for doc in result.docs:
print(f" Brand: {doc.brand}, Model: {doc.model}, Year: {doc.year}, Price: {doc.price}")

# 组合查询:查找品牌为 "Toyota" 且年份大于 2020 的汽车
query = Query("@brand:{Toyota} @year:[2021 2023]") # 使用 @field:{value} 语法进行标签/精确匹配,[min max] 进行数值范围
result = r.ft("idx:cars").search(query)
print(f"\n搜索 'Toyota' 且年份在 2021-2023 的结果 ({result.total} 条):")
for doc in result.docs:
print(f" Brand: {doc.brand}, Model: {doc.model}, Year: {doc.year}, Price: {doc.price}")

# 排序和限制结果
query = Query("*").sort_by("price", asc=False).return_field("model").return_field("price").paging(0, 3) # 按价格降序,只返回model和price,取前3条
result = r.ft("idx:cars").search(query)
print(f"\n按价格降序排列的前3辆车 ({result.total} 条):")
for doc in result.docs:
print(f" Model: {doc.model}, Price: {doc.price}")

# 删除索引 (如果需要重新创建或清理)
# r.ft("idx:cars").dropindex()

RedisSearch 使用说明:

  • 索引创建: 使用 ft().create_index() 定义要索引的字段和类型。
  • 数据存储: RedisSearch 可以索引 Redis HashJSON 数据类型的数据。在示例中,我们使用 HSET 存储车辆信息。
  • 查询语法: RedisSearch 支持一套丰富的查询语法,包括全文搜索、标签过滤、数值范围、地理空间查询等。
  • 性能: 通过倒排索引和高效的数据结构,RedisSearch 能够提供非常快的搜索速度。

RedisSearch 是构建实时搜索、数据过滤和分析应用的关键工具

5. RedisGraph 使用

RedisGraph 是一个高性能的图数据库模块,它利用稀疏邻接矩阵存储图数据,并使用 Cypher 查询语言进行数据操作。它非常适合处理节点和关系之间复杂连接的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import redis
from redis.commands.graph import Graph

# 连接到 Redis
r = redis.Redis()
# 获取 RedisGraph 客户端实例
graph = Graph(r, "social-network") # "social-network" 是图的名称

# 清理旧数据 (可选,用于每次运行示例时重置图)
# graph.delete()

# 创建节点和关系
# 使用 Cypher 语言定义图结构
# CREATE (:Person {name: 'Alice', age: 30})
# CREATE (:Person {name: 'Bob', age: 25})
# CREATE (:Person {name: 'Charlie', age: 35})
# CREATE (:Movie {title: 'Inception', release_year: 2010})
# CREATE (:Movie {title: 'The Matrix', release_year: 1999})
# CREATE (p1:Person {name: 'Alice'})-[:FRIENDS_WITH]->(p2:Person {name: 'Bob'})
# CREATE (p1)-[:WATCHED {rating: 9}]->(m1:Movie {title: 'Inception'})
# CREATE (p2)-[:WATCHED {rating: 8}]->(m1)
# CREATE (p3:Person {name: 'Charlie'})-[:FRIENDS_WITH]->(p1)

query_create = """
CREATE (:Person {name: 'Alice', age: 30}),
(:Person {name: 'Bob', age: 25}),
(:Person {name: 'Charlie', age: 35}),
(:Movie {title: 'Inception', release_year: 2010}),
(:Movie {title: 'The Matrix', release_year: 1999}),
(:Movie {title: 'Dune', release_year: 2021}),
(a:Person {name: 'Alice'})-[:FRIENDS_WITH]->(b:Person {name: 'Bob'}),
(a)-[:WATCHED {rating: 9}]->(i:Movie {title: 'Inception'}),
(b)-[:WATCHED {rating: 8}]->(i),
(c:Person {name: 'Charlie'})-[:FRIENDS_WITH]->(a),
(a)-[:WATCHED {rating: 7}]->(:Movie {title: 'Dune'})
"""
graph.query(query_create)
print("--- RedisGraph 查询 ---")

# 查询:查找所有 Person 节点
query_persons = "MATCH (p:Person) RETURN p.name, p.age"
result_persons = graph.query(query_persons)
print("所有人物:")
for record in result_persons.result_set:
print(f" 姓名: {record[0]}, 年龄: {record[1]}")

# 查询:查找观看过 'Inception' 且评分大于 8 的人
query_watched_inception = """
MATCH (p:Person)-[w:WATCHED]->(m:Movie)
WHERE m.title = 'Inception' AND w.rating > 8
RETURN p.name AS Viewer, w.rating AS Rating, m.title AS MovieTitle
"""
result_watched = graph.query(query_watched_inception)
print("\n观看 'Inception' 且评分大于 8 的人:")
for record in result_watched.result_set:
print(f" 观看者: {record[0]}, 评分: {record[1]}, 电影: {record[2]}")

# 查询:查找 Alice 的所有朋友以及他们观看过的电影
query_alice_friends_movies = """
MATCH (alice:Person {name: 'Alice'})-[:FRIENDS_WITH]-(friend:Person)
OPTIONAL MATCH (friend)-[:WATCHED]->(movie:Movie)
RETURN friend.name AS FriendName, COLLECT(movie.title) AS WatchedMovies
"""
result_friends_movies = graph.query(query_alice_friends_movies)
print("\nAlice 的朋友及他们观看的电影:")
for record in result_friends_movies.result_set:
print(f" 朋友: {record[0]}, 观看的电影: {record[1] if record[1] else '无'}")

# 删除整个图 (可选,用于清理)
# graph.delete()

RedisGraph 使用说明:

  • 图模型: RedisGraph 使用节点 (Nodes)关系 (Relationships) 来表示数据,每个节点和关系都可以有属性。
  • Cypher 查询语言: RedisGraph 使用 Cypher,这是一种声明式图查询语言,语法直观且功能强大。
  • 应用场景: 非常适合处理复杂关联数据,如社交网络分析、推荐系统、知识图谱、网络拓扑管理等。

RedisGraph 提供了在 Redis 生态系统中构建高性能图应用的能力,无需引入独立的图数据库。

6. RedisTimeSeries 使用

RedisTimeSeries 模块用于存储和查询时间序列数据。它针对时间序列数据进行了优化,支持高吞吐量的写入和查询,并提供了聚合、下采样、数据保留策略等功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import redis
import time
from datetime import datetime, timedelta

# 连接到 Redis
r = redis.Redis()

# 创建时间序列 (可选,如果已存在则跳过)
# r.ts().create("sensor:temperature") # 创建一个名为 "sensor:temperature" 的时间序列
# r.ts().create("sensor:humidity", retention_msecs=60 * 60 * 1000) # 设置数据保留时间为 1 小时

print("--- RedisTimeSeries 操作 ---")

# 添加数据点
# ts().add(key, timestamp, value, **kwargs)
# timestamp 可以是 Unix 毫秒时间戳,也可以是 '*' 表示当前时间

# 记录当前时间
now_ms = int(time.time() * 1000)

print("添加数据点...")
# 模拟在不同时间点添加温度数据
r.ts().add("sensor:temperature", now_ms, 25.5)
time.sleep(0.1) # 模拟时间流逝
r.ts().add("sensor:temperature", now_ms + 100, 25.7)
time.sleep(0.1)
r.ts().add("sensor:temperature", now_ms + 200, 25.8)
time.sleep(0.1)
r.ts().add("sensor:temperature", now_ms + 300, 26.0)
time.sleep(0.1)
r.ts().add("sensor:temperature", now_ms + 400, 25.9)

# 也可以直接使用 '*' 作为时间戳,Redis 会自动使用当前服务器时间
r.ts().add("sensor:temperature", '*', 26.1)
r.ts().add("sensor:temperature", '*', 26.2)
r.ts().add("sensor:temperature", '*', 26.0)

# 获取最新的数据点
latest_temperature = r.ts().get("sensor:temperature")
print(f"\n最新温度: {latest_temperature}") # (时间戳, 值)

# 查询时间范围数据
# 从过去 5 秒到现在的温度数据
five_seconds_ago_ms = int((datetime.now() - timedelta(seconds=5)).timestamp() * 1000)
range_data = r.ts().range("sensor:temperature", five_seconds_ago_ms, now_ms + 1000)
print(f"\n过去几秒的温度数据 ({len(range_data)} 条):")
for ts, val in range_data:
print(f" 时间: {datetime.fromtimestamp(ts / 1000).strftime('%H:%M:%S.%f')[:-3]}, 值: {val}")

# 使用聚合查询:获取过去 10 秒内每 1 秒的平均温度
# from_timestamp_ms = int((datetime.now() - timedelta(seconds=10)).timestamp() * 1000)
# to_timestamp_ms = int(datetime.now().timestamp() * 1000)
from_timestamp_ms = now_ms - 1000 # 从初始时间戳开始的1秒前
to_timestamp_ms = now_ms + 10000 # 初始时间戳的10秒后


aggregated_data = r.ts().range(
"sensor:temperature",
from_timestamp_ms,
to_timestamp_ms,
aggregation_type='AVG', # 聚合类型:平均值 (SUM, MIN, MAX, COUNT, FIRST, LAST, STDD.P, VAR.P 等)
bucket_size_msec=1000 # 桶大小:每 1000 毫秒 (1 秒) 聚合一次
)
print(f"\n过去几秒每秒平均温度 ({len(aggregated_data)} 条):")
for ts, val in aggregated_data:
print(f" 时间: {datetime.fromtimestamp(ts / 1000).strftime('%H:%M:%S')}, 平均值: {val}")

# 获取时间序列信息
info = r.ts().info("sensor:temperature")
print(f"\n时间序列 'sensor:temperature' 的信息: {info}")

RedisTimeSeries 使用说明:

  • 数据点: 每个数据点由一个时间戳和一个组成。时间戳通常是 Unix 毫秒时间戳。
  • 创建: ts().create() 用于创建时间序列,可以设置保留时间 (retention_msecs)。
  • 添加: ts().add() 用于添加数据点。
  • 查询: ts().range() 用于查询某个时间范围内的数据,并支持各种聚合类型(如 AVG, SUM, MIN, MAX)和桶大小 (bucket_size_msec) 进行下采样。
  • 性能: RedisTimeSeries 采用特殊的数据结构来存储时间序列数据,以实现高效的写入和查询。

RedisTimeSeries 适用于需要处理大量时间序列数据的场景,如:

  • 物联网 (IoT) 数据: 传感器数据采集和分析。
  • 监控和警报: 系统性能指标监控。
  • 金融数据: 股票价格、交易量等。
  • 日志分析: 记录和查询事件发生频率。

Redis-Stack 使用
https://kingjem.github.io/2025/06/07/Redis-stack 使用/
作者
Ruhai
发布于
2025年6月7日
许可协议