Milvus 向量数据库完整指南
一、核心理论
1.1 什么是 Milvus
Milvus 是一个云原生的开源向量数据库,专为 AI 应用设计,用于高效存储和检索海量非结构化数据(图像、视频、文本等的向量嵌入)。
1.2 核心架构组件
| 组件 | 功能 |
|---|---|
| Proxy | 请求入口,负责认证、参数校验、路由分发 |
| RootCoord | 元数据管理,创建/删除集合、分区管理 |
| QueryCoord | 查询协调,管理查询任务分配(2.6+ 合并到 MixCoord) |
| QueryNode | 执行向量搜索和过滤 |
| DataCoord | 数据段协调,管理 Segment 分布 |
| DataNode | 数据持久化、索引构建、Compaction(2.6 合并 IndexNode) |
| Streaming Node | 流数据处理(2.6 新增) |
1.3 一致性级别
python
# Strong - 强一致性,等待所有节点同步
consistency_level="Strong"
# Bounded - 有界一致性(默认),容忍指定时间范围内的数据延迟
consistency_level="Bounded"
# Session - 会话一致性,同一会话可见性保证
consistency_level="Session"
# Eventually - 最终一致性,不等待同步
consistency_level="Eventually"1.4 Segment 段管理
- Segment: 数据存储的基本单位
- Compaction: 段合并策略
merging: 小段合并成大段expired: 删除过期数据clustering: 基于聚类键的优化合并
二、完整语法(PyMilvus SDK)
2.1 连接管理
python
from pymilvus import MilvusClient
# 本地 Milvus Lite
client = MilvusClient(uri="milvus.db")
# 远程 Milvus Server
client = MilvusClient(
uri="http://localhost:19530",
token="root:Milvus"
)
# Milvus Cloud
client = MilvusClient(
uri="YOUR_MILVUS_CLOUD_ENDPOINT",
token="YOUR_TOKEN"
)2.2 Schema 定义
python
from pymilvus import DataType, Function, FunctionType
schema = client.create_schema(auto_id=True)
# 字段类型
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=True) # 主键
schema.add_field("name", DataType.VARCHAR, max_length=100) # 字符串
schema.add_field("age", DataType.INT32) # 整数
schema.add_field("score", DataType.FLOAT) # 浮点数
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=768) # 稠密向量
schema.add_field("sparse_vec", DataType.SPARSE_FLOAT_VECTOR) # 稀疏向量
schema.add_field("tags", DataType.ARRAY, max_capacity=10, element_type=DataType.VARCHAR) # 数组
# BM25 函数(自动从文本生成稀疏向量)
bm25_fn = Function(
name="bm25_function",
input_field_names=["text"],
output_field_names=["sparse_vector"],
function_type=FunctionType.BM25
)
schema.add_function(bm25_fn)2.3 索引配置
python
index_params = client.prepare_index_params()
# HNSW - 高召回率,适合内存充足场景
index_params.add_index(
field_name="vector",
index_type="HNSW",
metric_type="COSINE", # L2, IP, COSINE
params={
"M": 16, # 每个节点的连接数(越大召回越高,内存越多)
"efConstruction": 200 # 构建时的候选集大小
}
)
# IVF_FLAT - 中等召回,内存友好
index_params.add_index(
field_name="vector",
index_type="IVF_FLAT",
metric_type="L2",
params={"nlist": 128} # 聚类数量
)
# IVF_PQ - 极低内存占用,召回稍低
index_params.add_index(
field_name="vector",
index_type="IVF_PQ",
metric_type="IP",
params={
"nlist": 128, # 聚类数
"nbits": 8 # 量化位数
}
)
# DiskANN - 数据超出内存时使用 SSD
index_params.add_index(
field_name="vector",
index_type="DISKANN",
metric_type="L2",
params={"use_sliding": False}
)
# AUTOINDEX - 自动选择最佳索引(推荐默认)
index_params.add_index(
field_name="vector",
index_type="AUTOINDEX",
metric_type="COSINE"
)
# 稀疏向量索引
index_params.add_index(
field_name="sparse_vector",
index_type="SPARSE_INVERTED_INDEX",
metric_type="BM25",
params={"drop_ratio_build": 0.2}
)2.4 集合操作
python
# 创建集合
client.create_collection(
collection_name="my_collection",
schema=schema,
index_params=index_params,
consistency_level="Bounded"
)
# 检查集合是否存在
exists = client.has_collection("my_collection")
# 查看集合信息
info = client.describe_collection("my_collection")
# 加载集合到内存
client.load_collection("my_collection")
# 释放集合
client.release_collection("my_collection")
# 删除集合
client.drop_collection("my_collection")2.5 数据插入
python
# 单条插入
single = {
"id": 1,
"name": "Product A",
"vector": [0.1, 0.2, ...], # 768 维
"price": 29.99
}
result = client.insert("my_collection", single)
# 批量插入
batch_data = [
{"id": i, "name": f"Item {i}", "vector": random_vec, "price": float(i)}
for i in range(1000)
]
result = client.insert("my_collection", batch_data, progress_bar=True)
# 获取插入结果
print(f"Inserted: {result['insert_count']} rows")2.6 向量搜索
python
# 基础搜索
results = client.search(
collection_name="my_collection",
data=[query_vector], # 查询向量
anns_field="vector", # 向量字段名
limit=10, # 返回数量
search_params={
"metric_type": "COSINE",
"params": {
"ef": 100, # HNSW: 搜索范围(越大越准越慢)
"nprobe": 32 # IVF: 探测聚类数
}
},
output_fields=["name", "price"], # 返回标量字段
filter="price > 20 and price < 100" # 标量过滤
)
# 遍历结果
for hits in results:
for hit in hits:
print(f"ID: {hit['id']}, Score: {hit['distance']:.4f}")
print(f"Data: {hit['entity']}")2.7 混合检索(Hybrid Search)
python
from pymilvus import AnnSearchRequest, RRFRanker
# 稠密向量搜索请求
dense_req = AnnSearchRequest(
data=[dense_query_vec],
anns_field="dense_vector",
param={"metric_type": "COSINE"},
limit=10
)
# 稀疏向量/BM25 搜索请求
sparse_req = AnnSearchRequest(
data=[sparse_query_vec],
anns_field="sparse_vector",
param={"metric_type": "BM25"},
limit=10
)
# 多路召回 + RRF 融合
results = client.hybrid_search(
collection_name="docs",
reqs=[dense_req, sparse_req],
ranker=RRFRanker(), # RRF 排名融合
limit=10,
output_fields=["text", "source"]
)2.8 标量查询
python
# 按主键查询
results = client.query(
collection_name="my_collection",
ids=[1, 2, 3],
output_fields=["name", "price"]
)
# 过滤查询
results = client.query(
collection_name="products",
filter='category == "electronics" and in_stock == true',
output_fields=["*"],
limit=100
)
# 复杂表达式
filter_expr = """
age >= 18 and age <= 60
and city in ["Beijing", "Shanghai"]
and tags.contains("VIP")
"""2.9 分区管理
python
# 创建分区
client.create_partition("collection", "partition_2024")
# 检查分区
has_part = client.has_partition("collection", "partition_2024")
# 列出分区
partitions = client.list_partitions("collection")
# 分区插入/搜索
client.insert("collection", data, partition_name="partition_2024")
results = client.search(..., partition_names=["p1", "p2"])
# 删除分区
client.drop_partition("collection", "partition_2024")2.10 删除数据
python
# 按 ID 删除
client.delete(
collection_name="my_collection",
ids=[1, 2, 3]
)
# 按条件删除
client.delete(
collection_name="my_collection",
filter="price < 10"
)三、索引类型详解
3.1 索引对比矩阵
| 索引类型 | 适用场景 | 召回率 | 速度 | 内存占用 |
|---|---|---|---|---|
| FLAT | 精确搜索,小规模数据 (<10 万) | 100% | 慢 | 低 |
| HNSW | 高召回需求,内存充足 | >99% | 快 | 高 |
| HNSW_SQ/PQ | 需要压缩内存的 HNSW | 95-99% | 快 | 中 |
| IVF_FLAT | 中等召回,平衡方案 | 90-95% | 较快 | 中 |
| IVF_PQ | 超大规模,内存受限 | 85-95% | 快 | 低 |
| SCANN | 高维向量优化 | 90-98% | 快 | 中 |
| DiskANN | 数据>内存,SSD 存储 | 95-99% | 中 | 低 |
| AUTOINDEX | 通用场景(推荐) | 自适应 | 自适应 | 自适应 |
3.2 关键参数调优
python
# HNSW 参数
{
"M": 16-48, # 层间连接数,越大召回越高
"efConstruction": 100-500, # 构建时搜索宽度
"ef": 10-500 # 搜索时搜索宽度(运行时可调)
}
# IVF 参数
{
"nlist": 64-2048, # 聚类数,约等于 sqrt(N)/1000
"nprobe": 8-256 # 搜索时探测聚类数
}
# PQ 参数
{
"m": dim/16, # 子向量维度
"nbits": 4-8 # 每维量化位数
}四、距离度量
4.1 度量类型
| 度量 | 适用场景 | 说明 |
|---|---|---|
| L2 (欧氏距离) | 聚类、kNN | 值越小越相似 |
| IP (内积) | 归一化向量、词向量 | 值越大越相似 |
| COSINE (余弦) | 文本、语义相似度 | 值越大越相似 |
| HAMMING | 二进制哈希 | 值越小越相似 |
| JACCARD | 集合相似度 | 值越小越相似 |
4.2 向量归一化
python
# COSINE 度量需要将向量归一化
import numpy as np
def normalize(vector):
return vector / np.linalg.norm(vector)
normalized_vec = normalize(query_vector)五、性能优化
5.1 索引优化策略
python
# 根据数据规模选择索引
if num_entities < 100_000:
index_type = "FLAT" # 精确搜索
elif num_entities < 10_000_000 and ram_sufficient:
index_type = "HNSW" # 高召回
elif num_entities > ram_size:
index_type = "DiskANN" # 磁盘存储
else:
index_type = "IVF_PQ" # 内存压缩5.2 搜索参数调优
python
# 召回率 vs 速度的权衡
search_params = {
"params": {
"nprobe": 32, # IVF: 增加提升召回但变慢
"ef": 100, # HNSW: 增加提升召回但变慢
}
}
# 高召回模式
high_recall = {"params": {"nprobe": 128, "ef": 400}}
# 高速模式
fast_mode = {"params": {"nprobe": 8, "ef": 20}}5.3 批量操作最佳实践
python
# 批量插入(每次 500-1000 条)
BATCH_SIZE = 500
for i in range(0, len(data), BATCH_SIZE):
batch = data[i:i+BATCH_SIZE]
client.insert(collection_name, batch)
# 批量搜索
results = client.search(
collection_name="coll",
data=[vec1, vec2, vec3], # 多查询向量
limit=10,
nq=3 # 查询数量
)5.4 资源管理
yaml
# docker-compose 资源配置
querynode:
resources:
limits:
memory: 8Gi
cpu: "4"
datanode:
resources:
limits:
memory: 16Gi
cpu: "8"5.5 标量索引加速过滤
python
# 为高频过滤字段创建标量索引
client.create_index(
collection_name="products",
field_name="category",
index_type="INVERTED" # 字符串倒排索引
)
client.create_index(
collection_name="products",
field_name="price",
index_type="SCALAR" # 数值区间索引
)六、RAG 应用场景
6.1 混合检索 RAG Pipeline
python
from pymilvus import MilvusClient, DataType, Function, FunctionType, AnnSearchRequest, RRFRanker
# 创建支持混合检索的 Schema
schema = client.create_schema(auto_id=True)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("text", DataType.VARCHAR, max_length=2048, enable_analyzer=True)
schema.add_field("sparse_vec", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("dense_vec", DataType.FLOAT_VECTOR, dim=1536)
# BM25 自动函数
bm25_fn = Function("bm25", ["text"], ["sparse_vec"], FunctionType.BM25)
schema.add_function(bm25_fn)
# 双路索引
index_params = client.prepare_index_params()
index_params.add_index("dense_vec", "AUTOINDEX", "COSINE")
index_params.add_index("sparse_vec", "SPARSE_INVERTED_INDEX", "BM25")
client.create_collection("rag_docs", schema, index_params)
# RAG 查询
def rag_query(question):
dense_vec = embed_model.encode(question)
dense_req = AnnSearchRequest([dense_vec], "dense_vec", {}, 5)
sparse_req = AnnSearchRequest([question], "sparse_vec", {"metric_type": "BM25"}, 5)
results = client.hybrid_search(
"rag_docs",
[dense_req, sparse_req],
RRFRanker(),
limit=5,
output_fields=["text"]
)
context = "\n".join([hit['entity']['text'] for hit in results[0]])
return llm.generate(f"{context}\n\nQ: {question}\nA:")七、快速参考表
7.1 常用 API
| 操作 | API |
|---|---|
| 创建集合 | create_collection() |
| 插入数据 | insert() |
| 向量搜索 | search() |
| 混合搜索 | hybrid_search() |
| 标量查询 | query() |
| 删除数据 | delete() |
| 创建索引 | create_index() |
| 加载集合 | load_collection() |
7.2 字段类型
| 类型 | 用途 |
|---|---|
| INT64 | 主键、大整数 |
| VARCHAR | 字符串(需指定 max_length) |
| FLOAT | 浮点数 |
| FLOAT_VECTOR | 稠密向量(需指定 dim) |
| SPARSE_FLOAT_VECTOR | 稀疏向量(dict 格式) |
| ARRAY | 数组类型 |
八、官方文档
- 主站:https://milvus.io/docs
- GitHub: https://github.com/milvus-io/milvus
- PyMilvus SDK: https://github.com/milvus-io/pymilvus
