Sign In
Free Sign Up
  • English
  • Español
  • 简体中文
  • Deutsch
  • 日本語
Sign In
Free Sign Up
  • English
  • Español
  • 简体中文
  • Deutsch
  • 日本語

使用 VoyageAI、Gemini 和 LangGraph 构建 Agentic RAG 应用

检索增强生成(RAG) (opens new window)作为人工智能领域的重大进步,改变了聊天机器人 (opens new window)与用户互动的方式。RAG将检索方法与生成式人工智能相结合,使聊天机器人能够从大型外部数据源中提取实时数据,并生成准确且相关的回答。这种创新简化了聊天机器人的开发流程,并确保其能够适应不断变化的数据,这对于客户支持 (opens new window)等需要及时准确信息的领域至关重要。

然而,传统的 RAG 系统在需要快速决策的场景中(例如预约或处理实时请求)表现欠佳。Agentic RAG 应用通过引入智能体 (opens new window)来解决这个问题。这些智能体能够自主地检索、验证和处理数据,赋予人工智能在行动中做出决策的能力。与主要用于检索数据和生成回答的标准 RAG 不同,Agentic RAG 应用更适合处理需要快速准确决策的复杂情况,例如医学诊断或客户服务。

Agentic RAG 应用工作流程

本教程将指导你构建一个智能的主动问答 (Q&A) 系统,该系统可以根据用户的查询动态决定是使用知识库 (opens new window)还是进行互联网搜索。为此,我们将集成以下工具:

LangChain (opens new window):作为系统的核心框架,LangChain 将管理系统与语言模型和其他工具的交互。它将根据用户查询,智能地选择搜索知识库还是互联网。

MyScaleDB (opens new window):作为高性能向量数据库,MyScaleDB 将存储表示知识库的 embedding (opens new window) 向量。当用户查询与知识库相关时,系统将利用 MyScaleDB 高效地搜索和匹配相关信息。

VoyageAI (opens new window): VoyageAI 提供多种 embedding 模型,用于从文本中生成 embedding 向量,从而更好地编码知识以进行搜索和匹配。

Tavily (opens new window):当用户的问题需要最新或外部数据时,Tavily 将从互联网获取实时信息,确保系统始终提供准确的答案。

# 设置环境

在开始构建 Agentic RAG 应用之前,你需要安装必要的 Python 库并获取所需的 API 密钥:

  1. 安装 Python 库
pip install -U langchain-google-genai langchain-voyageai langchain-core langchain-community

这将安装 LangChain 及其相关组件,包括与 Google 生成式 AI、VoyageAI 和 MyScaleDB 交互的模块。

  1. 设置 API 密钥

接下来是设置所需的API密钥以使用 Gemini (opens new window)(Google 生成式 AI)和 Tavily 搜索 (opens new window)。对于 MyScale,用户可以按照此快速入门 (opens new window)指南进行操作。

import os

# MyScale API凭据
os.environ["MYSCALE_HOST"] = "msc-24862074.us-east-1.aws.myscale.com"
os.environ["MYSCALE_PORT"] = "443"
os.environ["MYSCALE_USERNAME"] = "your_myscale_username"
os.environ["MYSCALE_PASSWORD"] = "your_myscale_password"

# Tavily API密钥
os.environ["TAVILY_API_KEY"] = "your_tavily_api_key"

# Gemini的Google API密钥
os.environ["GOOGLE_API_KEY"] = "your_google_api_key"

请将上述代码中的占位符替换为你的实际 API 密钥。

注意:所有这些工具都提供免费版本以测试其功能。因此,可以在各自的平台上创建帐户并获取 API 密钥。

# 读取和拆分文本

接下来,我们需要准备知识库数据。为了提高系统处理和检索信息的效率,我们需要将文本数据拆分成更小的块。LangChain 提供了 CharacterTextSplitter 工具,可以根据字符数量将文本拆分为可管理的块:

from langchain_text_splitters import CharacterTextSplitter

with open("myscaledb_summary.txt") as f:
    state_of_the_union = f.read()

text_splitter = CharacterTextSplitter(
    separator="\\n\\n",
    chunk_size=1000,
    chunk_overlap=200,
    length_function=len,
    is_separator_regex=False,
)

texts = text_splitter.create_documents([state_of_the_union])

CharacterTextSplitter 根据字符计数将文本拆分成较小的可管理块。

注意:本教程将使用一个包含 MyScaleDB 基本信息的 txt 作为示例。你可以根据实际需求替换为其他数据源。

# 将数据添加到知识库

完成文本数据拆分后,我们需要生成 embedding 向量并将它们存储到知识库中。我们将使用 VoyageAI 的 embedding 模型生成文本块的向量表示,并将它们存储在 MyScaleDB 数据库中:

from langchain_voyageai import VoyageAIEmbeddings
from langchain_community.vectorstores import MyScale

embeddings = VoyageAIEmbeddings(
    voyage_api_key="your_voyageai_api_key",
    model="voyage-law-2"
)

vectorstore = MyScale.from_documents(
    texts,
    embeddings,
)

retriever = vectorstore.as_retriever()

.from_documents (opens new window)方法会自动将文本文档转换为 embedding 向量,并将它们存储到 MyScaleDB 数据库中。

# 为模型创建工具

在 LangChain 中,工具 (opens new window)是智能体可以利用的功能,用于执行超出文本生成的特定任务。通过为模型配备工具,它可以与外部数据源进行交互,获取实时信息,并为用户查询提供更准确和相关的答案。

在本应用程序中,我们为智能体开发了两个不同的工具,用于根据用户的查询决定使用哪个工具:

  • MyScaleDB 检索工具: 当查询与 MyScaleDB 或 MyScale 相关时,智能体将使用该工具从我们自定义的知识库中检索信息。
  • Tavily 搜索工具: 对于需要最新信息的其他查询,智能体将使用该工具执行实时互联网搜索。

# MyScale 内容检索工具

我们可以使用 create_retriever_tool 方法将上面创建的 MyScaleDB 向量数据库转换为 LangChain 工具:

from langchain.tools.retriever import create_retriever_tool

retriever_tool = create_retriever_tool(
    retriever,
    "retrieve_myscale_content",
    "用于返回有关MyScaleDB的信息。",
)

# 用于实时互联网数据的 Tavily 搜索工具

LangChain 通过其社区包提供了对 Tavily 搜索工具 (opens new window)的内置支持:

from langchain_community.tools import TavilySearchResults

tool = TavilySearchResults(
    max_results=5,
    search_depth="advanced",
    include_answer=True,
    name="live_search",
    description="用于搜索互联网上的最新新闻。",
)

tools = [retriever_tool, tool]

TavilySearchResults 工具被设置为查找最新的新闻,最多有 5 个结果和高级搜索深度,包括直接答案。最后,将这两个工具添加到一个列表中。

注意:每个工具的描述非常重要,因为它们帮助智能体确定何时以及如何使用每个工具。

Boost Your AI App Efficiency now
Sign up for free to benefit from 150+ QPS with 5,000,000 vectors
Free Trial
Explore our product

# 定义应用程序的工作流程

定义好所需的工具后,我们需要设计 Agentic RAG 应用的完整工作流程,包括具体步骤和执行的检查:

  1. 用户查询分析: 当用户提交查询时,智能体首先会对其进行分析,理解其意图和上下文。
  2. 工具选择: 根据分析结果,智能体将决定使用哪个工具来获取信息:
    • 如果查询与 MyScaleDB 或 MyScale 相关,则选择 MyScaleDB 检索工具从知识库中检索信息。
    • 如果查询需要最新信息或与知识库无关,则选择 Tavily 搜索工具执行实时互联网搜索。
  3. 信息检索:
    • MyScaleDB 检索工具: 智能体将使用用户查询作为检索条件,从 MyScaleDB 数据库中检索相关的文本块。
    • Tavily 搜索工具: 智能体将使用用户查询作为搜索词,调用 Tavily 搜索 API 获取实时互联网信息。
  4. 相关性检查 (仅限 MyScaleDB 检索工具): 对于从 MyScaleDB 检索到的文本块,智能体会进行相关性检查,确保检索到的信息与用户查询相关:
    • 如果文本块相关,则直接进入步骤 5。
    • 如果文本块不相关,则智能体会尝试对用户查询进行改写,并使用改写后的查询重新执行步骤 3。
  5. 答案生成: 智能体将检索到的相关信息(来自 MyScaleDB 或 Tavily)传递给大型语言模型 (LLM)。LLM 结合这些信息和用户查询,生成最终的答案。

应用程序工作流程

# 定义智能体和状态

首先,定义一个 AgentState 类,用于跟踪整个过程中响应的状态。

# 标准库导入
from typing import Literal, Annotated, Sequence, TypedDict

# Langchain Core导入
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field

# Langchain和外部工具
from langchain import hub
from langchain_google_genai import ChatGoogleGenerativeAI

# LangGraph导入
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]

现在,让我们定义智能体,它将分析用户的查询并决定是使用知识库还是执行网络搜索。Gemini 模型用于选择正确的工具,并根据查询生成答案。

def agent(state):
    messages = state["messages"]
    model = ChatGoogleGenerativeAI(
        model="gemini-1.5-pro",
        temperature=0,
        max_tokens=None,
        timeout=None,
        max_retries=2,
    )
    model = model.bind_tools(tools)
    response = model.invoke(messages)
    return {"messages": [response]}

# 对检索到的文档进行评分

智能体确保检索到的文档与用户的查询相关。grade_documents 函数将分析检索到的文档以检查其相关性。如果检索到的文档相关,智能体将决定生成答案;否则,它将重写用户的查询。

def grade_documents(state) -> Literal["generate", "rewrite"]:
    # 评分的数据模型
    class Grade(BaseModel):
        """用于相关性检查的二进制分数。"""
        binary_score: str = Field(description="相关性分数 'yes' 或 'no'")

    # LLM
    model = ChatGoogleGenerativeAI(
        model="gemini-1.5-pro",
        temperature=0,
        max_tokens=None,
        timeout=None,
        max_retries=2,
    )
    
    llm_with_tool = model.with_structured_output(Grade)

    # 提示
    prompt = PromptTemplate(
        template="""你是一个评分员,正在评估检索到的文档与用户问题的相关性。 
        这是检索到的文档:

        {context}

        这是用户问题:{question}
        如果文档包含与用户问题相关的关键词或语义意义,请将其评分为相关。
        给出一个二进制分数 'yes' 或 'no',以指示文档是否与问题相关。""",
        input_variables=["context", "question"],
    )

    # 链
    chain = prompt | llm_with_tool

    messages = state["messages"]
    last_message = messages[-1]

    question = messages[0].content
    docs = last_message.content

    scored_result = chain.invoke({"question": question, "context": docs})

    score = scored_result.binary_score

    if score == "yes":
        return "generate"
    else:
        return "rewrite"

# 重写用户查询

如果检索到的文档不相关,智能体将重写用户的查询,以改进从知识库中检索到的结果。

def rewrite(state):
    messages = state["messages"]
    question = messages[0].content

    msg = [
        HumanMessage(
            content=f""" 
        查看输入并尝试推理出底层的语义意图/含义。
        这是初始问题:

        -------
        {question} 
        -------
        制定一个改进的问题:""",
        )
    ]

    # LLM
    model = ChatGoogleGenerativeAI(
        model="gemini-1.5-pro",
        temperature=0,
        max_tokens=None,
        timeout=None,
        max_retries=2,
    )
    response = model.invoke(msg)
    return {"messages": [response]}

# 生成最终答案

一旦确认检索到的文档与用户的查询相关,智能体将检索到的文档发送给最终答案生成。

def generate(state):
    messages = state["messages"]
    question = messages[0].content
    last_message = messages[-1]

    docs = last_message.content

    # 提示
    prompt = hub.pull("rlm/rag-prompt")

    # LLM
    llm = ChatGoogleGenerativeAI(
        model="gemini-1.5-pro",
        temperature=0,
        max_tokens=None,
        timeout=None,
        max_retries=2,
    )

    # 链
    rag_chain = prompt | llm | StrOutputParser()

    # 运行
    response = rag_chain.invoke({"context": docs, "question": question})
    return {"messages": [response]}

# 将所有内容放入 LangGraph

我们使用 LangGraph (opens new window) 来简化应用程序的工作流程。它有助于管理智能体将采取的不同步骤,例如决定是使用知识库还是网络搜索、检索相关信息和生成答案。通过将这些任务组织成节点并定义它们之间的明确路径,LangGraph 确保一切运行顺利高效。它简化了处理复杂决策,并使应用程序结构化和易于维护。

LangGraph logo

为了制作一个图表,让我们指定节点(操作)以及它们之间的连接方式。

from langgraph.graph import END, StateGraph, START
from langgraph.prebuilt import ToolNode

# 定义一个新的图表
workflow = StateGraph(AgentState)

# 定义我们将在之间循环的节点
workflow.add_node("agent", agent)  # 智能体节点
retrieve = ToolNode([retriever_tool])
workflow.add_node("retrieve", retrieve)  # 检索节点
search = ToolNode([tool])
workflow.add_node("search", search)  # 搜索节点
workflow.add_node("rewrite", rewrite)  # 重写节点
workflow.add_node("generate", generate)  # 生成节点

add_node函数将节点添加到图表中。第一个参数是节点的名称,第二个参数是它所代表的函数。

# 定义边缘

在 LangGraph 中,节点代表不同的处理步骤,而边缘定义了基于特定条件的节点之间的转换关系。边缘控制着应用程序的完整流程。

# 为工具定义条件边缘

首先,我们需要定义一个条件判断节点,根据用户的查询选择使用 MyScaleDB 检索工具还是 Tavily 搜索工具。如果查询中包含关键词 “myscaledb” 或 “myscale”,则认为用户查询与 MyScaleDB 相关,智能体将使用 MyScaleDB 检索工具;否则,智能体将使用 Tavily 搜索工具。

def tools_condition(state) -> Literal["retrieve", "search"]:
    messages = state["messages"]
    question = messages[0].content.lower()

    if "myscaledb" in question or "myscale" in question:
        return "retrieve"
    else:
        return "search"

# 添加边缘

现在,将边缘添加到工作流图中。

workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
    "agent",
    tools_condition,
    {
        "search": "search", 
        "retrieve": "retrieve", 
    },
)

workflow.add_conditional_edges(
    "retrieve",
    grade_documents,
)

workflow.add_edge("retrieve", "generate")
workflow.add_edge("search", "generate")
workflow.add_edge("generate", END)
workflow.add_edge("rewrite", "agent")

# 编译图表
graph = workflow.compile()

add_edge 函数定义了从一个节点到另一个节点的直接转换。add_conditional_edges 函数允许我们根据条件指定转换。第一个参数是源节点,第二个参数是条件函数,第三个参数是一个字典,将可能的条件结果映射到目标节点。

# 可视化最终图表

现在,我们来可视化最终的图表,看看节点和边缘是如何连接的。

from IPython.display import Image, display

try:
    display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
except Exception:
    # 这需要一些额外的依赖项,是可选的
    pass
Final graph

# 执行图表

最后,执行图表,看看我们的问答系统如何运作。

inputs = {
    "messages": [
        ("user", "OpenAI最近发布了哪个模型?"),
    ]
}
for output in graph.stream(inputs):
    for value in output.items():
        print(value["messages"][0], indent=2, width=80, depth=None)

它将生成如下输出:

OpenAI最近发布了一个名为 OpenAI o1 的新生成AI模型。这个模型在“推理”能力方面表现出色,可以自我核实事实并解决编码和数学等复杂问题。它于2024912日发布,适用于 ChatGPT Plus 和 Team 用户。

当进行关于 MyScaleDB 的查询时,如下所示:

inputs = {
    "messages": [
        ("user", "什么是MyScaleDB?"),
    ]
}
for output in graph.stream(inputs):
    for key, value in output.items():
        print(value["messages"][0], indent=2, width=80, depth=None)

输出将类似于:

"MyScaleDB 是一个基于开源 ClickHouse 构建的高性能云数据库,专为人工智能和机器学习应用而设计。它支持结构化和非结构化数据,非常适合管理大量数据和执行复杂的分析任务。"
Join Our Newsletter

# 结论

现在,你已成功构建了一个动态的问答系统,它能够根据用户查询智能选择使用知识库还是执行实时互联网搜索。通过集成 LangChain、MyScaleDB、VoyageAI 和 Tavily 等工具,这个适应性强的 AI 智能体能够高效处理复杂问题。如果感兴趣的话, 你可以继续探索这些工具,以进一步增强你的 AI 应用程序!

Keep Reading
images
不要将未来建立在专门的向量数据库上

随着人工智能的兴起,向量数据库因其高效存储、管理和检索大规模高维数据的能力而受到了广泛关注。这种能力对于处理文本、图像和视频等非结构化数据的人工智能和生成式人工智能(GenAI)应用至关重要。 向量数据库的主要逻辑是提供相似性搜索功能,而不是传统数据库提供的关键字搜索。这个概念在大型语言模型(LLM)的性能提升中得到了广泛应用,特别是在ChatGPT发布之后。 LLM的最大问题是需要大量的资源 ...

images
如何快速上手 MyScale

数据是当今几乎所有组织的核心。随着数据量的不断增长,公司必须找到有效存储、处理和分析数据的方法。这导致了数据库市场的爆炸,公司既使用传统的 SQL 数据库,也使用较新的向量数据库来完成不同的任务。 然而,每种类型的数据库都有其权衡。传统的 SQL 数据库为结构化数据提供了一致性、准确性和易用性,而向量数据库则针对速度和可扩展性进行了优化,尤其是在处理大量非结构化数据时。但如果你不必选择呢?如果有 ...

images
使用 MyScale 构建支持 RAG 的聊天机器人

大型语言模型(LLM)在从知识库中检索上下文后,可以更可靠地提供真实答案,这就是检索增强生成(RAG)的原理。我们之前的博客讨论了RAG的性能优势和[成本与延迟的可行性](https://myscale.com/blog/zh/what-to-expect-rag ...

Start building your Al projects with MyScale today

Free Trial
Contact Us