提示词

提示词链编排:复杂任务的分治策略

Chain-of-Prompts、条件路由、错误传播与 LangChain LCEL 实现 | 2026-02

状态
已收录
语言
中文
来源
…/src/data/.articles-bodies/ai-literacy--提示词链编排-复杂任务的分治策略.html
重复副本
0

提取结果

提示词片段

Single Prompt (fragile): Complex Question -> [Mega Prompt] -> Answer (often wrong) Chain of Prompts (robust): Complex Question -> [Classify] -> [Retrieve] -> [Synthesize] -> [Validate] -> Answer | | | | Simple, Focused, Specialized, Quality reliable efficient accurate checked
from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser, JsonOutputParser from langchain_openai import ChatOpenAI # LCEL: LangChain Expression Language # Uses the pipe (|) operator to chain components # Simple chain: prompt | model | parser classify_prompt = ChatPromptTemplate.from_messages([ ("system", "Classify the user query into: tech_support, billing, general"), ("user", "{query}"), ]) model = ChatOpenAI(model="gpt-4o-mini", temperature=0) parser = StrOutputParser() # This creates a Runnable chain classify_chain = classify_prompt | model | parser # Execute result = await classify_chain.ainvoke({"query": "My invoice is wrong"}) # result: "billing"
from langchain_core.runnables import RunnableLambda, RunnablePassthrough # Define specialized chains for each category tech_chain = ( ChatPromptTemplate.from_messages([ ("system", "You are a technical support specialist. Diagnose and solve the issue."), ("user", "{query}"), ]) | ChatOpenAI(model="gpt-4o", temperature=0.3) | StrOutputParser() ) billing_chain = ( ChatPromptTemplate.from_messages([ ("system", "You are a billing specialist. Help with invoice and payment issues."), ("user", "{query}"), ]) | ChatOpenAI(model="gpt-4o-mini", temperature=0.3) # Cheaper model for billing | StrOutputParser() ) general_chain = ( ChatPromptTemplate.from_messages([ ("system", "You are a helpful general assistant."), ("user", "{query}"), ]) | ChatOpenAI(model="gpt-4o-mini", temperature=0.5) | StrOutputParser() ) # Router function def route(info: dict) -> object: category = info["category"].strip().lower() if "tech" in category: return tech_chain elif "billing" in category: return billing_chain else: return general_chain # Complete routing chain full_chain = ( {"category": classify_chain, "query": RunnablePassthrough()} | RunnableLambda(route) ) # Execute result = await full_chain.ainvoke({"query": "My invoice is wrong"})
from langchain_core.runnables import RunnableParallel # Run multiple analyses in parallel parallel_analysis = RunnableParallel( entities=( ChatPromptTemplate.from_messages([ ("system", "Extract named entities (people, companies, locations)."), ("user", "{text}"), ]) | model | JsonOutputParser() ), sentiment=( ChatPromptTemplate.from_messages([ ("system", "Analyze sentiment. Output: positive/negative/neutral with score."), ("user", "{text}"), ]) | model | JsonOutputParser() ), topics=( ChatPromptTemplate.from_messages([ ("system", "Extract main topics (max 5)."), ("user", "{text}"), ]) | model | JsonOutputParser() ), ) # Merge results synthesis_chain = ( parallel_analysis | ChatPromptTemplate.from_messages([ ("system", "Synthesize the analysis into a comprehensive report."), ("user", "Entities: {entities}\nSentiment: {sentiment}\nTopics: {topics}"), ]) | model | StrOutputParser() ) report = await synthesis_chain.ainvoke({"text": article_text})
from pydantic import BaseModel, ValidationError class AnalysisResult(BaseModel): summary: str key_points: list[str] confidence: float async def generate_with_validation( query: str, max_retries: int = 3, ) -> AnalysisResult: """Generate structured output with self-correction loop.""" generate_prompt = ChatPromptTemplate.from_messages([ ("system", "Analyze the query and provide structured analysis as JSON."), ("user", "{query}"), ]) fix_prompt = ChatPromptTemplate.from_messages([ ("system", "Fix the JSON output based on the validation error."), ("user", "Original output:\n{output}\n\nError:\n{error}\n\nFix it:"), ]) generate_chain = generate_prompt | model | StrOutputParser() fix_chain = fix_prompt | model | StrOutputParser() output = await generate_chain.ainvoke({"query": query}) for attempt in range(max_retries): try: parsed = json.loads(output) return AnalysisResult.model_validate(parsed) except (json.JSONDecodeError, ValidationError) as e: if attempt == max_retries - 1: raise output = await fix_chain.ainvoke({ "output": output, "error": str(e), }) raise ValueError("Failed to generate valid output")
async def chain_with_quality_gate(query: str) -> str: """Chain with intermediate quality check.""" # Step 1: Generate draft draft_chain = ( ChatPromptTemplate.from_messages([ ("system", "Write a detailed analysis."), ("user", "{query}"), ]) | ChatOpenAI(model="gpt-4o") | StrOutputParser() ) # Step 2: Quality check (cheaper model as judge) quality_chain = ( ChatPromptTemplate.from_messages([ ("system", """Rate this analysis on a scale of 1-10. Output JSON: {"score": N, "issues": ["issue1", ...]} Score >= 7 = pass. Below 7 = needs improvement."""), ("user", "Query: {query}\n\nAnalysis: {draft}"), ]) | ChatOpenAI(model="gpt-4o-mini", temperature=0) | JsonOutputParser() ) # Step 3: Improve if needed improve_chain = ( ChatPromptTemplate.from_messages([ ("system", "Improve this analysis based on the feedback."), ("user", "Original: {draft}\n\nIssues: {issues}\n\nImproved version:"), ]) | ChatOpenAI(model="gpt-4o") | StrOutputParser() ) draft = await draft_chain.ainvoke({"query": query}) quality = await quality_chain.ainvoke({"query": query, "draft": draft}) if quality["score"] >= 7: return draft else: improved = await improve_chain.ainvoke({ "draft": draft, "issues": "\n".join(quality["issues"]), }) return improved
from langchain_text_splitters import RecursiveCharacterTextSplitter async def map_reduce_summary(document: str) -> str: """Summarize a long document using map-reduce.""" # Split document into chunks splitter = RecursiveCharacterTextSplitter( chunk_size=4000, chunk_overlap=200, ) chunks = splitter.split_text(document) # Map: Summarize each chunk map_chain = ( ChatPromptTemplate.from_messages([ ("system", "Summarize this text section concisely. Keep key facts and numbers."), ("user", "{chunk}"), ]) | ChatOpenAI(model="gpt-4o-mini") | StrOutputParser() ) # Process chunks in parallel import asyncio summaries = await asyncio.gather(*[ map_chain.ainvoke({"chunk": chunk}) for chunk in chunks ]) # Reduce: Merge summaries reduce_chain = ( ChatPromptTemplate.from_messages([ ("system", """Merge these section summaries into one coherent summary. Eliminate redundancy. Preserve key facts. Keep under 500 words."""), ("user", "Section summaries:\n\n{summaries}"), ]) | ChatOpenAI(model="gpt-4o") | StrOutputParser() ) combined = "\n\n---\n\n".join( f"Section {i+1}: {s}" for i, s in enumerate(summaries) ) final = await reduce_chain.ainvoke({"summaries": combined}) return final
from langchain_core.runnables import RunnableConfig # Fallback chains primary_chain = ( prompt | ChatOpenAI(model="gpt-4o") | parser ) fallback_chain = ( prompt | ChatOpenAI(model="gpt-4o-mini") | parser ) # Use with_fallbacks for automatic failover robust_chain = primary_chain.with_fallbacks([fallback_chain]) # Use with_retry for transient errors retrying_chain = primary_chain.with_retry( stop_after_attempt=3, wait_exponential_jitter=True, retry_if_exception_type=(TimeoutError, ConnectionError), )

正文

清洗后的原始内容

提示词链编排:复杂任务的分治策略

Chain-of-Prompts、条件路由、错误传播与 LangChain LCEL 实现 | 2026-02


一、为什么需要提示词链

单个提示词在面对复杂任务时会遇到瓶颈:上下文窗口不够用、指令过于复杂导致遵循率下降、不同子任务需要不同的模型或参数。提示词链(Chain of Prompts)将复杂任务分解为多个简单步骤,每个步骤使用专门优化的提示词。

Single Prompt (fragile):
  Complex Question -> [Mega Prompt] -> Answer (often wrong)

Chain of Prompts (robust):
  Complex Question -> [Classify] -> [Retrieve] -> [Synthesize] -> [Validate] -> Answer
                         |             |              |              |
                      Simple,        Focused,       Specialized,   Quality
                      reliable       efficient      accurate       checked

二、链式编排模式

2.1 基础模式

模式 描述 适用场景
顺序链 A -> B -> C 固定流程
条件链 A -> if(X) B else C 分类后分支
并行链 A -> [B, C] -> D 独立子任务
循环链 A -> B -> (check) -> A 自我修正
树形链 A -> [B -> D, C -> E] -> F 复杂分解
Map-Reduce Split -> [Process...] -> Merge 批量处理

2.2 架构图

Pattern 1: Sequential Chain
  Input -> [Step 1] -> [Step 2] -> [Step 3] -> Output

Pattern 2: Conditional Chain (Router)
  Input -> [Classifier] --"tech"--> [Tech Support]
                        --"billing"--> [Billing Agent]
                        --"general"--> [General FAQ]

Pattern 3: Parallel Chain
  Input -> [Extract Entities] ---|
        -> [Analyze Sentiment] --+--> [Merge & Synthesize] -> Output
        -> [Detect Language]   ---|

Pattern 4: Self-Correcting Loop
  Input -> [Generate] -> [Validate] --pass--> Output
                             |
                           fail
                             |
                             v
                       [Fix with feedback] ---> [Validate]
                       (max 3 iterations)

Pattern 5: Map-Reduce
  Long Document -> [Split into chunks]
                        |
               +--------+--------+
               |        |        |
          [Summarize] [Summarize] [Summarize]
               |        |        |
               +--------+--------+
                        |
                   [Merge summaries] -> Final Summary

三、LangChain LCEL 实现

3.1 LCEL 基础

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_openai import ChatOpenAI

# LCEL: LangChain Expression Language
# Uses the pipe (|) operator to chain components

# Simple chain: prompt | model | parser
classify_prompt = ChatPromptTemplate.from_messages([
    ("system", "Classify the user query into: tech_support, billing, general"),
    ("user", "{query}"),
])

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()

# This creates a Runnable chain
classify_chain = classify_prompt | model | parser

# Execute
result = await classify_chain.ainvoke({"query": "My invoice is wrong"})
# result: "billing"

3.2 条件路由

from langchain_core.runnables import RunnableLambda, RunnablePassthrough

# Define specialized chains for each category
tech_chain = (
    ChatPromptTemplate.from_messages([
        ("system", "You are a technical support specialist. Diagnose and solve the issue."),
        ("user", "{query}"),
    ])
    | ChatOpenAI(model="gpt-4o", temperature=0.3)
    | StrOutputParser()
)

billing_chain = (
    ChatPromptTemplate.from_messages([
        ("system", "You are a billing specialist. Help with invoice and payment issues."),
        ("user", "{query}"),
    ])
    | ChatOpenAI(model="gpt-4o-mini", temperature=0.3)  # Cheaper model for billing
    | StrOutputParser()
)

general_chain = (
    ChatPromptTemplate.from_messages([
        ("system", "You are a helpful general assistant."),
        ("user", "{query}"),
    ])
    | ChatOpenAI(model="gpt-4o-mini", temperature=0.5)
    | StrOutputParser()
)

# Router function
def route(info: dict) -> object:
    category = info["category"].strip().lower()
    if "tech" in category:
        return tech_chain
    elif "billing" in category:
        return billing_chain
    else:
        return general_chain

# Complete routing chain
full_chain = (
    {"category": classify_chain, "query": RunnablePassthrough()}
    | RunnableLambda(route)
)

# Execute
result = await full_chain.ainvoke({"query": "My invoice is wrong"})

3.3 并行链

from langchain_core.runnables import RunnableParallel

# Run multiple analyses in parallel
parallel_analysis = RunnableParallel(
    entities=(
        ChatPromptTemplate.from_messages([
            ("system", "Extract named entities (people, companies, locations)."),
            ("user", "{text}"),
        ])
        | model
        | JsonOutputParser()
    ),
    sentiment=(
        ChatPromptTemplate.from_messages([
            ("system", "Analyze sentiment. Output: positive/negative/neutral with score."),
            ("user", "{text}"),
        ])
        | model
        | JsonOutputParser()
    ),
    topics=(
        ChatPromptTemplate.from_messages([
            ("system", "Extract main topics (max 5)."),
            ("user", "{text}"),
        ])
        | model
        | JsonOutputParser()
    ),
)

# Merge results
synthesis_chain = (
    parallel_analysis
    | ChatPromptTemplate.from_messages([
        ("system", "Synthesize the analysis into a comprehensive report."),
        ("user", "Entities: {entities}\nSentiment: {sentiment}\nTopics: {topics}"),
    ])
    | model
    | StrOutputParser()
)

report = await synthesis_chain.ainvoke({"text": article_text})

四、自纠错循环

4.1 验证-修复循环

from pydantic import BaseModel, ValidationError

class AnalysisResult(BaseModel):
    summary: str
    key_points: list[str]
    confidence: float

async def generate_with_validation(
    query: str, max_retries: int = 3,
) -> AnalysisResult:
    """Generate structured output with self-correction loop."""
    generate_prompt = ChatPromptTemplate.from_messages([
        ("system", "Analyze the query and provide structured analysis as JSON."),
        ("user", "{query}"),
    ])

    fix_prompt = ChatPromptTemplate.from_messages([
        ("system", "Fix the JSON output based on the validation error."),
        ("user", "Original output:\n{output}\n\nError:\n{error}\n\nFix it:"),
    ])

    generate_chain = generate_prompt | model | StrOutputParser()
    fix_chain = fix_prompt | model | StrOutputParser()

    output = await generate_chain.ainvoke({"query": query})

    for attempt in range(max_retries):
        try:
            parsed = json.loads(output)
            return AnalysisResult.model_validate(parsed)
        except (json.JSONDecodeError, ValidationError) as e:
            if attempt == max_retries - 1:
                raise
            output = await fix_chain.ainvoke({
                "output": output, "error": str(e),
            })

    raise ValueError("Failed to generate valid output")

4.2 质量门禁

async def chain_with_quality_gate(query: str) -> str:
    """Chain with intermediate quality check."""
    # Step 1: Generate draft
    draft_chain = (
        ChatPromptTemplate.from_messages([
            ("system", "Write a detailed analysis."),
            ("user", "{query}"),
        ])
        | ChatOpenAI(model="gpt-4o")
        | StrOutputParser()
    )

    # Step 2: Quality check (cheaper model as judge)
    quality_chain = (
        ChatPromptTemplate.from_messages([
            ("system", """Rate this analysis on a scale of 1-10.
Output JSON: {"score": N, "issues": ["issue1", ...]}
Score >= 7 = pass. Below 7 = needs improvement."""),
            ("user", "Query: {query}\n\nAnalysis: {draft}"),
        ])
        | ChatOpenAI(model="gpt-4o-mini", temperature=0)
        | JsonOutputParser()
    )

    # Step 3: Improve if needed
    improve_chain = (
        ChatPromptTemplate.from_messages([
            ("system", "Improve this analysis based on the feedback."),
            ("user", "Original: {draft}\n\nIssues: {issues}\n\nImproved version:"),
        ])
        | ChatOpenAI(model="gpt-4o")
        | StrOutputParser()
    )

    draft = await draft_chain.ainvoke({"query": query})
    quality = await quality_chain.ainvoke({"query": query, "draft": draft})

    if quality["score"] >= 7:
        return draft
    else:
        improved = await improve_chain.ainvoke({
            "draft": draft,
            "issues": "\n".join(quality["issues"]),
        })
        return improved

五、Map-Reduce 模式

5.1 文档摘要

from langchain_text_splitters import RecursiveCharacterTextSplitter

async def map_reduce_summary(document: str) -> str:
    """Summarize a long document using map-reduce."""
    # Split document into chunks
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=4000, chunk_overlap=200,
    )
    chunks = splitter.split_text(document)

    # Map: Summarize each chunk
    map_chain = (
        ChatPromptTemplate.from_messages([
            ("system", "Summarize this text section concisely. Keep key facts and numbers."),
            ("user", "{chunk}"),
        ])
        | ChatOpenAI(model="gpt-4o-mini")
        | StrOutputParser()
    )

    # Process chunks in parallel
    import asyncio
    summaries = await asyncio.gather(*[
        map_chain.ainvoke({"chunk": chunk}) for chunk in chunks
    ])

    # Reduce: Merge summaries
    reduce_chain = (
        ChatPromptTemplate.from_messages([
            ("system", """Merge these section summaries into one coherent summary.
Eliminate redundancy. Preserve key facts. Keep under 500 words."""),
            ("user", "Section summaries:\n\n{summaries}"),
        ])
        | ChatOpenAI(model="gpt-4o")
        | StrOutputParser()
    )

    combined = "\n\n---\n\n".join(
        f"Section {i+1}: {s}" for i, s in enumerate(summaries)
    )
    final = await reduce_chain.ainvoke({"summaries": combined})
    return final

六、错误传播与处理

6.1 错误处理策略

策略 描述 适用场景
Fail-fast 第一个错误即停止 高精度要求
Fallback 失败时用备选链 高可用要求
Retry 失败时重试 临时性错误
Skip 跳过失败步骤 可选步骤
Default 返回默认值 非关键步骤

6.2 LCEL 错误处理

from langchain_core.runnables import RunnableConfig

# Fallback chains
primary_chain = (
    prompt | ChatOpenAI(model="gpt-4o") | parser
)
fallback_chain = (
    prompt | ChatOpenAI(model="gpt-4o-mini") | parser
)

# Use with_fallbacks for automatic failover
robust_chain = primary_chain.with_fallbacks([fallback_chain])

# Use with_retry for transient errors
retrying_chain = primary_chain.with_retry(
    stop_after_attempt=3,
    wait_exponential_jitter=True,
    retry_if_exception_type=(TimeoutError, ConnectionError),
)

七、性能优化

7.1 优化策略

策略 效果 实现方式
并行执行 减少总延迟 RunnableParallel
流式输出 减少感知延迟 .astream()
模型分级 减少成本 简单步骤用小模型
缓存 减少重复调用 LangChain Cache
批处理 提高吞吐量 .abatch()

7.2 模型分级策略

# Use different models for different chain steps
CHAIN_CONFIG = {
    "classify": {"model": "gpt-4o-mini", "temp": 0, "reason": "Simple classification"},
    "retrieve": {"model": None, "reason": "No LLM needed, vector search"},
    "synthesize": {"model": "gpt-4o", "temp": 0.3, "reason": "Complex generation"},
    "validate": {"model": "gpt-4o-mini", "temp": 0, "reason": "Binary judgment"},
    "format": {"model": "gpt-4o-mini", "temp": 0, "reason": "Formatting only"},
}

# Cost estimate per 1000 queries:
# All gpt-4o:    $25 (5 calls x $5 each)
# Model grading:  $8 (1 x $5 + 4 x $0.75)
# Savings: 68%

八、总结

提示词链编排是将 LLM 从"单次调用"提升为"可组合系统"的关键技术。核心设计原则:

  1. 单一职责:每个链节点只做一件事
  2. 模型匹配:按复杂度选择模型,不要所有步骤都用最贵的
  3. 错误隔离:每个节点独立错误处理,不让一个失败拖垮整条链
  4. 可观测:每个节点记录输入/输出/延迟/成本
  5. 并行优先:能并行的步骤不要串行

Maurice | maurice_wen@proton.me