为什么LLM应用必须认真对待缓存
调用一次GPT-4o,大概需要2-5秒,费用0.01-0.1美元。如果你的应用每天有10万次请求,其中有30%是相似或重复的问题,那么每个月你多付出了几万美元,用户还要额外等待数百万秒。缓存不是LLM应用的可选优化项,它是降本增效的核心武器。2026年,LLM缓存技术已经发展出多个层次:语义缓存、KV Cache、Prompt缓存、结果缓存、分层缓存架构。本文系统梳理这些技术,帮你构建一套完整的LLM缓存工程体系。—## 缓存层次全景LLM应用的缓存可以发生在多个层次:用户请求 ↓L1: 精确匹配缓存(同样的请求直接返回) ↓ (miss)L2: 语义相似缓存(含义相近的请求复用结果) ↓ (miss)L3: Prompt前缀缓存(共享的Prompt前缀复用KV Cache) ↓ (miss)L4: LLM API调用(实际推理) ↓结果写入各层缓存每一层都能节省时间和金钱,关键是在不同场景下选择合适的缓存策略。—## L1:精确匹配缓存最简单直接的缓存:对相同的输入,返回相同的输出。pythonimport hashlibimport jsonimport redisfrom typing import Optional, Dict, Anyclass ExactMatchCache: def __init__(self, redis_url: str, ttl_seconds: int = 3600): self.redis = redis.from_url(redis_url) self.ttl = ttl_seconds def _make_key(self, messages: list, model: str, params: dict) -> str: """生成缓存key,考虑所有影响输出的因素""" cache_input = { "messages": messages, "model": model, # 只包含影响输出的参数 "temperature": params.get("temperature", 1.0), "max_tokens": params.get("max_tokens"), "response_format": params.get("response_format"), } key_str = json.dumps(cache_input, sort_keys=True, ensure_ascii=False) return f"llm:exact:{hashlib.sha256(key_str.encode()).hexdigest()}" def get(self, messages: list, model: str, params: dict) -> Optional[dict]: key = self._make_key(messages, model, params) cached = self.redis.get(key) if cached: return json.loads(cached) return None def set(self, messages: list, model: str, params: dict, response: dict): key = self._make_key(messages, model, params) # 不缓存高温度(随机性高)的结果 if params.get("temperature", 1.0) > 0.5: return self.redis.setex(key, self.ttl, json.dumps(response, ensure_ascii=False)) def get_stats(self) -> dict: """获取缓存命中统计""" hit_count = int(self.redis.get("llm:stats:hits") or 0) miss_count = int(self.redis.get("llm:stats:misses") or 0) total = hit_count + miss_count return { "hits": hit_count, "misses": miss_count, "hit_rate": hit_count / total if total > 0 else 0 }适用场景:FAQ机器人、固定模板查询、文档生成等重复性高的场景。命中率通常在10-30%。—## L2:语义相似缓存语义缓存是2024-2026年LLM缓存领域最重要的进展。核心思路:用向量相似度代替精确字符串匹配。“明天北京天气怎么样"和"北京明天天气如何"本质上是同一个问题,精确缓存无法命中,但语义缓存可以。pythonimport numpy as npfrom sklearn.metrics.pairwise import cosine_similarityimport openaiclass SemanticCache: def __init__( self, embedding_model: str = "text-embedding-3-small", similarity_threshold: float = 0.95, max_cache_size: int = 10000 ): self.client = openai.OpenAI() self.embedding_model = embedding_model self.threshold = similarity_threshold self.cache_entries = [] # [(embedding, query, response)] self.max_size = max_cache_size def _get_embedding(self, text: str) -> np.ndarray: """获取文本的向量表示""" response = self.client.embeddings.create( model=self.embedding_model, input=text ) return np.array(response.data[0].embedding) def _extract_query_text(self, messages: list) -> str: """从消息列表中提取用于语义匹配的关键文本""" # 通常只用最后一条用户消息做语义匹配 for msg in reversed(messages): if msg.get("role") == "user": return msg["content"] return str(messages) def get(self, messages: list) -> Optional[dict]: """语义搜索缓存""" if not self.cache_entries: return None query_text = self._extract_query_text(messages) query_emb = self._get_embedding(query_text) # 计算与所有缓存条目的相似度 cached_embs = np.array([entry[0] for entry in self.cache_entries]) similarities = cosine_similarity([query_emb], cached_embs)[0] best_idx = np.argmax(similarities) best_score = similarities[best_idx] if best_score >= self.threshold: _, cached_query, cached_response = self.cache_entries[best_idx] return { "response": cached_response, "cache_hit": True, "similarity": float(best_score), "matched_query": cached_query } return None def set(self, messages: list, response: dict): """将新条目加入语义缓存""" query_text = self._extract_query_text(messages) embedding = self._get_embedding(query_text) self.cache_entries.append((embedding, query_text, response)) # LRU淘汰:超出容量时移除最旧的条目 if len(self.cache_entries) > self.max_size: self.cache_entries.pop(0) def get_with_metadata(self, messages: list) -> dict: """返回带元数据的缓存查询结果""" result = self.get(messages) if result: return result return {"response": None, "cache_hit": False}### 生产级语义缓存:使用向量数据库大规模部署时,用Qdrant等向量数据库替代内存存储:pythonfrom qdrant_client import QdrantClientfrom qdrant_client.models import ( Distance, VectorParams, PointStruct, SearchRequest)import uuidclass QdrantSemanticCache: def __init__( self, qdrant_url: str, collection_name: str = "llm_cache", similarity_threshold: float = 0.93 ): self.client = QdrantClient(url=qdrant_url) self.collection = collection_name self.threshold = similarity_threshold self._ensure_collection() def _ensure_collection(self): """确保向量集合存在""" collections = [c.name for c in self.client.get_collections().collections] if self.collection not in collections: self.client.create_collection( collection_name=self.collection, vectors_config=VectorParams(size=1536, distance=Distance.COSINE) ) async def search(self, query_embedding: list, top_k: int = 1) -> Optional[dict]: """在向量数据库中搜索相似缓存""" results = self.client.search( collection_name=self.collection, query_vector=query_embedding, limit=top_k, score_threshold=self.threshold, with_payload=True ) if results: best = results[0] return { "response": best.payload["response"], "similarity": best.score, "query": best.payload["query"] } return None async def insert( self, query: str, embedding: list, response: dict, metadata: dict = None ): """插入新的缓存条目""" point = PointStruct( id=str(uuid.uuid4()), vector=embedding, payload={ "query": query, "response": response, "metadata": metadata or {}, "created_at": int(time.time()) } ) self.client.upsert( collection_name=self.collection, points=[point] )—## L3:Prompt前缀缓存(KV Cache复用)OpenAI、Anthropic、Google等主流API提供商都支持Prompt Caching,对相同的前缀文本不重复计算,直接复用KV Cache。### OpenAI Prompt Caching的工程实践pythonclass PromptCacheOptimizer: """优化Prompt结构以最大化缓存命中率""" @staticmethod def build_cacheable_messages( system_prompt: str, static_context: str, # 文档、背景信息等固定内容 conversation_history: list, user_query: str ) -> list: """ 构建有利于缓存的消息结构。 缓存原则:把不变的内容放前面,变化的内容放后面。 OpenAI对1024+ tokens的前缀自动缓存,价格减少50%。 """ messages = [] # 1. 系统提示(最稳定,长度越长缓存收益越大) messages.append({ "role": "system", "content": system_prompt }) # 2. 静态上下文(文档、知识库等,相对稳定) if static_context: messages.append({ "role": "user", "content": f"以下是参考资料:\n\n{static_context}" }) messages.append({ "role": "assistant", "content": "好的,我已了解参考资料。请告诉我您的问题。" }) # 3. 历史对话(会话级稳定) messages.extend(conversation_history) # 4. 当前用户输入(每次都变,放最后) messages.append({ "role": "user", "content": user_query }) return messages @staticmethod def estimate_cache_savings( static_tokens: int, requests_per_day: int, model: str = "gpt-4o" ) -> dict: """估算Prompt缓存能节省的费用""" # gpt-4o 价格(2026年参考) prices = { "gpt-4o": {"input": 0.0025, "cached": 0.00125}, # per 1K tokens "gpt-4o-mini": {"input": 0.00015, "cached": 0.000075} } p = prices.get(model, prices["gpt-4o"]) # 假设70%的请求能命中前缀缓存 cache_hit_rate = 0.7 cached_requests = requests_per_day * cache_hit_rate normal_cost = static_tokens / 1000 * p["input"] * requests_per_day cached_cost = ( static_tokens / 1000 * p["input"] * (requests_per_day - cached_requests) + static_tokens / 1000 * p["cached"] * cached_requests ) return { "normal_daily_cost": round(normal_cost, 2), "cached_daily_cost": round(cached_cost, 2), "daily_savings": round(normal_cost - cached_cost, 2), "monthly_savings": round((normal_cost - cached_cost) * 30, 2), "savings_rate": round((1 - cached_cost / normal_cost) * 100, 1) }### Anthropic Claude的缓存控制pythondef build_claude_cached_messages( system_content: str, documents: list, user_query: str) -> dict: """构建Claude的显式缓存控制消息""" return { "model": "claude-opus-4-5", "system": [ { "type": "text", "text": system_content, "cache_control": {"type": "ephemeral"} # 标记为可缓存 } ], "messages": [ { "role": "user", "content": [ # 文档内容标记为可缓存 *[ { "type": "text", "text": doc, "cache_control": {"type": "ephemeral"} } for doc in documents ], # 用户问题不缓存(每次都变) { "type": "text", "text": user_query } ] } ] }—## 分层缓存协调器将所有缓存层整合成统一接口:pythonimport timefrom dataclasses import dataclassfrom typing import Optional, Tuple@dataclassclass CacheResult: response: dict hit_layer: str # "exact", "semantic", "prompt_cache", "miss" latency_ms: float cost_saved: float = 0.0class LayeredCacheCoordinator: def __init__( self, exact_cache: ExactMatchCache, semantic_cache: QdrantSemanticCache, embedding_client, llm_client ): self.exact = exact_cache self.semantic = semantic_cache self.embedder = embedding_client self.llm = llm_client # 统计 self.stats = {"exact": 0, "semantic": 0, "miss": 0} async def complete( self, messages: list, model: str = "gpt-4o", **kwargs ) -> CacheResult: start = time.time() # L1: 精确缓存 exact_result = self.exact.get(messages, model, kwargs) if exact_result: self.stats["exact"] += 1 return CacheResult( response=exact_result, hit_layer="exact", latency_ms=(time.time() - start) * 1000, cost_saved=self._estimate_cost(messages, model) ) # L2: 语义缓存 query_text = self._extract_last_user_message(messages) query_emb = await self.embedder.embed(query_text) semantic_result = await self.semantic.search(query_emb) if semantic_result: self.stats["semantic"] += 1 return CacheResult( response=semantic_result["response"], hit_layer="semantic", latency_ms=(time.time() - start) * 1000, cost_saved=self._estimate_cost(messages, model) * 0.8 ) # L3: 调用LLM(使用优化的前缀结构自动触发Prompt Cache) self.stats["miss"] += 1 response = await self.llm.chat.completions.create( model=model, messages=messages, **kwargs ) response_dict = response.model_dump() # 写入缓存 if kwargs.get("temperature", 1.0) <= 0.3: self.exact.set(messages, model, kwargs, response_dict) await self.semantic.insert(query_text, query_emb, response_dict) return CacheResult( response=response_dict, hit_layer="miss", latency_ms=(time.time() - start) * 1000 ) def _estimate_cost(self, messages: list, model: str) -> float: """估算节省的API调用费用""" total_chars = sum(len(m.get("content", "")) for m in messages) estimated_tokens = total_chars / 4 cost_per_1k = {"gpt-4o": 0.0025, "gpt-4o-mini": 0.00015} rate = cost_per_1k.get(model, 0.001) return estimated_tokens / 1000 * rate def get_hit_rate_report(self) -> dict: total = sum(self.stats.values()) if total == 0: return {} return { "total_requests": total, "exact_hit_rate": self.stats["exact"] / total, "semantic_hit_rate": self.stats["semantic"] / total, "miss_rate": self.stats["miss"] / total, "overall_cache_rate": 1 - self.stats["miss"] / total }—## 缓存失效策略缓存最大的风险是"过时数据”——当底层信息更新后,缓存里的答案已经错误。pythonclass CacheInvalidationManager: """缓存失效管理""" def __init__(self, redis_client, semantic_cache): self.redis = redis_client self.semantic = semantic_cache def invalidate_by_topic(self, topic_keywords: list): """ 基于主题失效:当某个领域的知识更新时, 清除所有与该主题相关的缓存条目 """ # 语义缓存:删除包含关键词的条目 # Qdrant支持基于payload的过滤删除 from qdrant_client.models import Filter, FieldCondition, MatchAny self.semantic.client.delete( collection_name=self.semantic.collection, points_selector=Filter( must=[ FieldCondition( key="metadata.topics", match=MatchAny(any=topic_keywords) ) ] ) ) def invalidate_by_age(self, max_age_hours: int = 24): """按时间失效:清除超过指定时间的缓存""" cutoff_time = int(time.time()) - max_age_hours * 3600 self.semantic.client.delete( collection_name=self.semantic.collection, points_selector=Filter( must=[ FieldCondition( key="created_at", range={"lt": cutoff_time} ) ] ) ) def invalidate_on_prompt_change(self, old_system_prompt: str, new_system_prompt: str): """ Prompt变更时清除缓存。 系统提示改变后,之前的缓存结果可能不再适用 """ old_hash = hashlib.md5(old_system_prompt.encode()).hexdigest() # 删除所有使用旧系统提示的缓存 self.semantic.client.delete( collection_name=self.semantic.collection, points_selector=Filter( must=[ FieldCondition( key="metadata.system_prompt_hash", match={"value": old_hash} ) ] ) )—## 缓存效果量化实施缓存后,必须量化其效果:| 指标 | 未缓存 | 加缓存后 | 提升幅度 ||------|--------|---------|---------|| 平均响应时间 | 3.5s | 0.8s | -77% || API调用次数 | 10万/天 | 6.5万/天 | -35% || 每日API费用 | $250 | $163 | -35% || 语义缓存命中率 | - | 25% | - || 精确缓存命中率 | - | 10% | - |—## 最佳实践总结场景匹配:- 高重复度的查询(客服/FAQ)→ 精确缓存优先- 语义相似的自然语言查询 → 语义缓存- 长系统提示+文档的查询 → Prompt前缀缓存注意事项:1.不要缓存高temperature输出:随机性高的结果缓存意义不大2.敏感信息隔离:涉及个人数据的查询不应缓存,或做数据脱敏3.缓存TTL按内容稳定性设置:实时数据短TTL,静态文档长TTL4.监控缓存命中率:低于10%说明缓存策略需要调整LLM缓存工程的终极目标:让用户感觉不到等待,让成本控制在可接受范围。这不是简单的性能优化,而是决定LLM应用能否规模化运营的关键工程能力。