Skip to content

Milvus 向量数据库完整指南

一、核心理论

1.1 什么是 Milvus

Milvus 是一个云原生的开源向量数据库,专为 AI 应用设计,用于高效存储和检索海量非结构化数据(图像、视频、文本等的向量嵌入)。

1.2 核心架构组件

组件功能
Proxy请求入口,负责认证、参数校验、路由分发
RootCoord元数据管理,创建/删除集合、分区管理
QueryCoord查询协调,管理查询任务分配(2.6+ 合并到 MixCoord)
QueryNode执行向量搜索和过滤
DataCoord数据段协调,管理 Segment 分布
DataNode数据持久化、索引构建、Compaction(2.6 合并 IndexNode)
Streaming Node流数据处理(2.6 新增)

1.3 一致性级别

python
# Strong - 强一致性,等待所有节点同步
consistency_level="Strong"

# Bounded - 有界一致性(默认),容忍指定时间范围内的数据延迟
consistency_level="Bounded"  

# Session - 会话一致性,同一会话可见性保证
consistency_level="Session"

# Eventually - 最终一致性,不等待同步
consistency_level="Eventually"

1.4 Segment 段管理

  • Segment: 数据存储的基本单位
  • Compaction: 段合并策略
    • merging: 小段合并成大段
    • expired: 删除过期数据
    • clustering: 基于聚类键的优化合并

二、完整语法(PyMilvus SDK)

2.1 连接管理

python
from pymilvus import MilvusClient

# 本地 Milvus Lite
client = MilvusClient(uri="milvus.db")

# 远程 Milvus Server
client = MilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

# Milvus Cloud
client = MilvusClient(
    uri="YOUR_MILVUS_CLOUD_ENDPOINT",
    token="YOUR_TOKEN"
)

2.2 Schema 定义

python
from pymilvus import DataType, Function, FunctionType

schema = client.create_schema(auto_id=True)

# 字段类型
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=True)      # 主键
schema.add_field("name", DataType.VARCHAR, max_length=100)                 # 字符串
schema.add_field("age", DataType.INT32)                                    # 整数
schema.add_field("score", DataType.FLOAT)                                  # 浮点数
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=768)                 # 稠密向量
schema.add_field("sparse_vec", DataType.SPARSE_FLOAT_VECTOR)               # 稀疏向量
schema.add_field("tags", DataType.ARRAY, max_capacity=10, element_type=DataType.VARCHAR)  # 数组

# BM25 函数(自动从文本生成稀疏向量)
bm25_fn = Function(
    name="bm25_function",
    input_field_names=["text"],
    output_field_names=["sparse_vector"],
    function_type=FunctionType.BM25
)
schema.add_function(bm25_fn)

2.3 索引配置

python
index_params = client.prepare_index_params()

# HNSW - 高召回率,适合内存充足场景
index_params.add_index(
    field_name="vector",
    index_type="HNSW",
    metric_type="COSINE",  # L2, IP, COSINE
    params={
        "M": 16,                    # 每个节点的连接数(越大召回越高,内存越多)
        "efConstruction": 200       # 构建时的候选集大小
    }
)

# IVF_FLAT - 中等召回,内存友好
index_params.add_index(
    field_name="vector",
    index_type="IVF_FLAT",
    metric_type="L2",
    params={"nlist": 128}           # 聚类数量
)

# IVF_PQ - 极低内存占用,召回稍低
index_params.add_index(
    field_name="vector",
    index_type="IVF_PQ",
    metric_type="IP",
    params={
        "nlist": 128,               # 聚类数
        "nbits": 8                  # 量化位数
    }
)

# DiskANN - 数据超出内存时使用 SSD
index_params.add_index(
    field_name="vector",
    index_type="DISKANN",
    metric_type="L2",
    params={"use_sliding": False}
)

# AUTOINDEX - 自动选择最佳索引(推荐默认)
index_params.add_index(
    field_name="vector",
    index_type="AUTOINDEX",
    metric_type="COSINE"
)

# 稀疏向量索引
index_params.add_index(
    field_name="sparse_vector",
    index_type="SPARSE_INVERTED_INDEX",
    metric_type="BM25",
    params={"drop_ratio_build": 0.2}
)

2.4 集合操作

python
# 创建集合
client.create_collection(
    collection_name="my_collection",
    schema=schema,
    index_params=index_params,
    consistency_level="Bounded"
)

# 检查集合是否存在
exists = client.has_collection("my_collection")

# 查看集合信息
info = client.describe_collection("my_collection")

# 加载集合到内存
client.load_collection("my_collection")

# 释放集合
client.release_collection("my_collection")

# 删除集合
client.drop_collection("my_collection")

2.5 数据插入

python
# 单条插入
single = {
    "id": 1,
    "name": "Product A",
    "vector": [0.1, 0.2, ...],  # 768 维
    "price": 29.99
}
result = client.insert("my_collection", single)

# 批量插入
batch_data = [
    {"id": i, "name": f"Item {i}", "vector": random_vec, "price": float(i)}
    for i in range(1000)
]
result = client.insert("my_collection", batch_data, progress_bar=True)

# 获取插入结果
print(f"Inserted: {result['insert_count']} rows")

2.6 向量搜索

python
# 基础搜索
results = client.search(
    collection_name="my_collection",
    data=[query_vector],           # 查询向量
    anns_field="vector",           # 向量字段名
    limit=10,                      # 返回数量
    search_params={
        "metric_type": "COSINE",
        "params": {
            "ef": 100,             # HNSW: 搜索范围(越大越准越慢)
            "nprobe": 32           # IVF: 探测聚类数
        }
    },
    output_fields=["name", "price"],  # 返回标量字段
    filter="price > 20 and price < 100"  # 标量过滤
)

# 遍历结果
for hits in results:
    for hit in hits:
        print(f"ID: {hit['id']}, Score: {hit['distance']:.4f}")
        print(f"Data: {hit['entity']}")
python
from pymilvus import AnnSearchRequest, RRFRanker

# 稠密向量搜索请求
dense_req = AnnSearchRequest(
    data=[dense_query_vec],
    anns_field="dense_vector",
    param={"metric_type": "COSINE"},
    limit=10
)

# 稀疏向量/BM25 搜索请求
sparse_req = AnnSearchRequest(
    data=[sparse_query_vec],
    anns_field="sparse_vector",
    param={"metric_type": "BM25"},
    limit=10
)

# 多路召回 + RRF 融合
results = client.hybrid_search(
    collection_name="docs",
    reqs=[dense_req, sparse_req],
    ranker=RRFRanker(),           # RRF 排名融合
    limit=10,
    output_fields=["text", "source"]
)

2.8 标量查询

python
# 按主键查询
results = client.query(
    collection_name="my_collection",
    ids=[1, 2, 3],
    output_fields=["name", "price"]
)

# 过滤查询
results = client.query(
    collection_name="products",
    filter='category == "electronics" and in_stock == true',
    output_fields=["*"],
    limit=100
)

# 复杂表达式
filter_expr = """
    age >= 18 and age <= 60 
    and city in ["Beijing", "Shanghai"] 
    and tags.contains("VIP")
"""

2.9 分区管理

python
# 创建分区
client.create_partition("collection", "partition_2024")

# 检查分区
has_part = client.has_partition("collection", "partition_2024")

# 列出分区
partitions = client.list_partitions("collection")

# 分区插入/搜索
client.insert("collection", data, partition_name="partition_2024")
results = client.search(..., partition_names=["p1", "p2"])

# 删除分区
client.drop_partition("collection", "partition_2024")

2.10 删除数据

python
# 按 ID 删除
client.delete(
    collection_name="my_collection",
    ids=[1, 2, 3]
)

# 按条件删除
client.delete(
    collection_name="my_collection",
    filter="price < 10"
)

三、索引类型详解

3.1 索引对比矩阵

索引类型适用场景召回率速度内存占用
FLAT精确搜索,小规模数据 (<10 万)100%
HNSW高召回需求,内存充足>99%
HNSW_SQ/PQ需要压缩内存的 HNSW95-99%
IVF_FLAT中等召回,平衡方案90-95%较快
IVF_PQ超大规模,内存受限85-95%
SCANN高维向量优化90-98%
DiskANN数据>内存,SSD 存储95-99%
AUTOINDEX通用场景(推荐)自适应自适应自适应

3.2 关键参数调优

python
# HNSW 参数
{
    "M": 16-48,                    # 层间连接数,越大召回越高
    "efConstruction": 100-500,     # 构建时搜索宽度
    "ef": 10-500                   # 搜索时搜索宽度(运行时可调)
}

# IVF 参数  
{
    "nlist": 64-2048,             # 聚类数,约等于 sqrt(N)/1000
    "nprobe": 8-256               # 搜索时探测聚类数
}

# PQ 参数
{
    "m": dim/16,                  # 子向量维度
    "nbits": 4-8                  # 每维量化位数
}

四、距离度量

4.1 度量类型

度量适用场景说明
L2 (欧氏距离)聚类、kNN值越小越相似
IP (内积)归一化向量、词向量值越大越相似
COSINE (余弦)文本、语义相似度值越大越相似
HAMMING二进制哈希值越小越相似
JACCARD集合相似度值越小越相似

4.2 向量归一化

python
# COSINE 度量需要将向量归一化
import numpy as np
def normalize(vector):
    return vector / np.linalg.norm(vector)

normalized_vec = normalize(query_vector)

五、性能优化

5.1 索引优化策略

python
# 根据数据规模选择索引
if num_entities < 100_000:
    index_type = "FLAT"          # 精确搜索
elif num_entities < 10_000_000 and ram_sufficient:
    index_type = "HNSW"          # 高召回
elif num_entities > ram_size:
    index_type = "DiskANN"       # 磁盘存储
else:
    index_type = "IVF_PQ"        # 内存压缩

5.2 搜索参数调优

python
# 召回率 vs 速度的权衡
search_params = {
    "params": {
        "nprobe": 32,      # IVF: 增加提升召回但变慢
        "ef": 100,         # HNSW: 增加提升召回但变慢
    }
}

# 高召回模式
high_recall = {"params": {"nprobe": 128, "ef": 400}}

# 高速模式
fast_mode = {"params": {"nprobe": 8, "ef": 20}}

5.3 批量操作最佳实践

python
# 批量插入(每次 500-1000 条)
BATCH_SIZE = 500
for i in range(0, len(data), BATCH_SIZE):
    batch = data[i:i+BATCH_SIZE]
    client.insert(collection_name, batch)

# 批量搜索
results = client.search(
    collection_name="coll",
    data=[vec1, vec2, vec3],   # 多查询向量
    limit=10,
    nq=3                        # 查询数量
)

5.4 资源管理

yaml
# docker-compose 资源配置
querynode:
  resources:
    limits:
      memory: 8Gi
      cpu: "4"
datanode:
  resources:
    limits:
      memory: 16Gi
      cpu: "8"

5.5 标量索引加速过滤

python
# 为高频过滤字段创建标量索引
client.create_index(
    collection_name="products",
    field_name="category",
    index_type="INVERTED"       # 字符串倒排索引
)

client.create_index(
    collection_name="products", 
    field_name="price",
    index_type="SCALAR"         # 数值区间索引
)

六、RAG 应用场景

6.1 混合检索 RAG Pipeline

python
from pymilvus import MilvusClient, DataType, Function, FunctionType, AnnSearchRequest, RRFRanker

# 创建支持混合检索的 Schema
schema = client.create_schema(auto_id=True)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("text", DataType.VARCHAR, max_length=2048, enable_analyzer=True)
schema.add_field("sparse_vec", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("dense_vec", DataType.FLOAT_VECTOR, dim=1536)

# BM25 自动函数
bm25_fn = Function("bm25", ["text"], ["sparse_vec"], FunctionType.BM25)
schema.add_function(bm25_fn)

# 双路索引
index_params = client.prepare_index_params()
index_params.add_index("dense_vec", "AUTOINDEX", "COSINE")
index_params.add_index("sparse_vec", "SPARSE_INVERTED_INDEX", "BM25")

client.create_collection("rag_docs", schema, index_params)

# RAG 查询
def rag_query(question):
    dense_vec = embed_model.encode(question)
    
    dense_req = AnnSearchRequest([dense_vec], "dense_vec", {}, 5)
    sparse_req = AnnSearchRequest([question], "sparse_vec", {"metric_type": "BM25"}, 5)
    
    results = client.hybrid_search(
        "rag_docs",
        [dense_req, sparse_req],
        RRFRanker(),
        limit=5,
        output_fields=["text"]
    )
    
    context = "\n".join([hit['entity']['text'] for hit in results[0]])
    return llm.generate(f"{context}\n\nQ: {question}\nA:")

七、快速参考表

7.1 常用 API

操作API
创建集合create_collection()
插入数据insert()
向量搜索search()
混合搜索hybrid_search()
标量查询query()
删除数据delete()
创建索引create_index()
加载集合load_collection()

7.2 字段类型

类型用途
INT64主键、大整数
VARCHAR字符串(需指定 max_length)
FLOAT浮点数
FLOAT_VECTOR稠密向量(需指定 dim)
SPARSE_FLOAT_VECTOR稀疏向量(dict 格式)
ARRAY数组类型

八、官方文档

更新于:

note