开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(六)

 一、前言

    使用 FastAPI 可以帮助我们更简单高效地部署 AI 交互业务。FastAPI 提供了快速构建 API 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。

    FastAPI 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,FastAPI 还提供了容器化部署能力,开发者可以轻松打包 AI 模型为 Docker 镜像,实现跨环境的部署和扩展。

    总之,使用 FastAPI 可以大大提高 AI 应用程序的开发效率和用户体验,为 AI 模型的部署和交互提供全方位的支持。

    本篇在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)基础上,学习如何集成Tool获取实时数据,并以流式方式返回


二、术语

2.1.Tool

    Tool(工具)是为了增强其语言模型的功能和实用性而设计的一系列辅助手段,用于扩展模型的能力。例如代码解释器(Code Interpreter)和知识检索(Knowledge Retrieval)等都属于其工具。

2.2.langchain预置的tools

    https://github.com/langchain-ai/langchain/tree/v0.1.16/docs/docs/integrations/tools

   基本这些工具能满足大部分需求,具体使用参见:

2.3.LangChain支持流式输出的方法

  • stream:基本的流式传输方式,能逐步给出代理的动作和观察结果。
  • astream:异步的流式传输,用于异步处理需求的情况。
  • astream_events:更细致的流式传输,能流式传输代理的每个具体事件,如工具调用和结束、模型启动和结束等,便于深入了解和监控代理执行的详细过程。

2.4.langchainhub

    是 LangChain 相关工具的集合中心,其作用在于方便开发者发现和共享常用的提示(Prompt)、链、代理等。

    它受 Hugging Face Hub 启发,促进社区交流与协作,推动 LangChain 生态发展。当前,它在新架构中被置于 LangSmith 里,主要聚焦于 Prompt。

2.5.asyncio

    是一个用于编写并发代码的标准库,它提供了构建异步应用程序的基础框架。


三、前置条件

3.1. 创建虚拟环境&安装依赖

  增加Google Search以及langchainhub的依赖包

conda create -n fastapi_test python=3.10
conda activate fastapi_test
pip install fastapi websockets uvicorn
pip install --quiet  langchain-core langchain-community langchain-openai
pip install google-search-results langchainhub

3.2. 注册Google Search API账号

参见:开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)

3.3. 生成Google Search API的KEY


四、技术实现

4.1. 使用Tool&流式输出

# -*- coding: utf-8 -*-
import asyncio
import os
from langchain.agents import  create_structured_chat_agent, AgentExecutor
from langchain_community.utilities.serpapi import SerpAPIWrapper
from langchain_core.prompts import SystemMessagePromptTemplate, HumanMessagePromptTemplate, ChatPromptTemplate
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'  # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"


llm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)


@tool
def search(query:str):
    """只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
    serp = SerpAPIWrapper()
    result = serp.run(query)
    print("实时搜索结果:", result)
    return result


tools = [search]

template='''
Respond to the human as helpfully and accurately as possible. You have access to the following tools:

{tools}

Use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).

Valid "action" values: "Final Answer" or {tool_names}

Provide only ONE action per $JSON_BLOB, as shown:

```

{{

  "action": $TOOL_NAME,

  "action_input": $INPUT

}}

```

Follow this format:

Question: input question to answer

Thought: consider previous and subsequent steps

Action:

```

$JSON_BLOB

```

Observation: action result

... (repeat Thought/Action/Observation N times)

Thought: I know what to respond

Action:

```

{{

  "action": "Final Answer",

  "action_input": "Final response to human"

}}

Begin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Format is Action:```$JSON_BLOB```then Observation
'''
system_message_prompt = SystemMessagePromptTemplate.from_template(template)
human_template='''
{input}

{agent_scratchpad}

 (reminder to respond in a JSON blob no matter what)
'''
human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])


print(prompt)

agent = create_structured_chat_agent(
    llm, tools, prompt
)

agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)


async def chat(params):
    events = agent_executor.astream_events(params,version="v2")
    async for event in events:
        type = event['event']
        if 'on_chat_model_stream' == type:
            data = event['data']
            chunk =  data['chunk']
            content =  chunk.content
            if content and len(content) > 0:
                print(content)



asyncio.run(chat({"input": "广州现在天气如何?"}))

调用结果:

说明:

流式输出的数据结构为:

{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='天', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}
type: on_chat_model_stream
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='气', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}

4.2. 通过langchainhub使用公共prompt

   在4.1使用Tool&流式输出的代码基础上进行调整

# -*- coding: utf-8 -*-
import asyncio
import os
from langchain.agents import  create_structured_chat_agent, AgentExecutor
from langchain_community.utilities.serpapi import SerpAPIWrapper
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'  # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

from langchain import hub

llm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)


@tool
def search(query:str):
    """只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
    serp = SerpAPIWrapper()
    result = serp.run(query)
    print("实时搜索结果:", result)
    return result


tools = [search]

prompt = hub.pull("hwchase17/structured-chat-agent")

print(prompt)

agent = create_structured_chat_agent(
    llm, tools, prompt
)

agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)



async def chat(params):
    events = agent_executor.astream_events(params,version="v2")
    async for event in events:
        type = event['event']
        if 'on_chat_model_stream' == type:
            data = event['data']
            chunk =  data['chunk']
            content =  chunk.content
            if content and len(content) > 0:
                print(content)


asyncio.run(chat({"input": "广州现在天气如何?"}))

调用结果:

4.3. 整合代码

在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)的代码基础上进行调整

import uvicorn
import os

from typing import Annotated
from fastapi import (
    Depends,
    FastAPI,
    WebSocket,
    WebSocketException,
    WebSocketDisconnect,
    status,
)
from langchain import hub
from langchain.agents import create_structured_chat_agent, AgentExecutor
from langchain_community.utilities import SerpAPIWrapper

from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'  # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"


class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

app = FastAPI()

async def authenticate(
    websocket: WebSocket,
    userid: str,
    secret: str,
):
    if userid is None or secret is None:
        raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)

    print(f'userid: {userid},secret: {secret}')
    if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:
        return 'pass'
    else:
        return 'fail'

@tool
def search(query:str):
    """只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
    serp = SerpAPIWrapper()
    result = serp.run(query)
    print("实时搜索结果:", result)
    return result


def get_prompt():
    prompt = hub.pull("hwchase17/structured-chat-agent")

    return prompt

async def chat(query):
    global llm,tools
    agent = create_structured_chat_agent(
        llm, tools, get_prompt()
    )

    agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)

    events = agent_executor.astream_events({"input": query}, version="v1")
    async for event in events:
        type = event['event']
        if 'on_chat_model_stream' == type:
            data = event['data']
            chunk = data['chunk']
            content = chunk.content
            if content and len(content) > 0:
                print(content)
                yield content


@app.websocket("/ws")
async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],):
    await manager.connect(websocket)
    try:
        while True:
            text = await websocket.receive_text()

            if 'fail' == permission:
                await manager.send_personal_message(
                    f"authentication failed", websocket
                )
            else:
                if text is not None and len(text) > 0:
                    async for msg in chat(text):
                        await manager.send_personal_message(msg, websocket)

    except WebSocketDisconnect:
        manager.disconnect(websocket)
        print(f"Client #{userid} left the chat")
        await manager.broadcast(f"Client #{userid} left the chat")

if __name__ == '__main__':
    tools = [search]
    llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, max_tokens=512)
    uvicorn.run(app, host='0.0.0.0',port=7777)

客户端:

<!DOCTYPE html>
<html>
    <head>
        <title>Chat</title>
    </head>
    <body>
        <h1>WebSocket Chat</h1>
        <form action="" onsubmit="sendMessage(event)">
            <label>USERID: <input type="text" id="userid" autocomplete="off" value="12345"/></label>
            <label>SECRET: <input type="text" id="secret" autocomplete="off" value="xxxxxxxxxxxxxxxxxxxxxxxxxx"/></label>
            <br/>
            <button onclick="connect(event)">Connect</button>
            <hr>
            <label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
            <button>Send</button>
        </form>
        <ul id='messages'>
        </ul>
        <script>
            var ws = null;
            function connect(event) {
                var userid = document.getElementById("userid")
                var secret = document.getElementById("secret")
                ws = new WebSocket("ws://localhost:7777/ws?userid="+userid.value+"&secret=" + secret.value);
                ws.onmessage = function(event) {
                    var messages = document.getElementById('messages')
                    var message = document.createElement('li')
                    var content = document.createTextNode(event.data)
                    message.appendChild(content)
                    messages.appendChild(message)
                };
                event.preventDefault()
            }
            function sendMessage(event) {
                var input = document.getElementById("messageText")
                ws.send(input.value)
                input.value = ''
                event.preventDefault()
            }
        </script>
    </body>
</html>

调用结果:

用户输入:你好

不需要触发工具调用

模型输出:

用户输入:广州现在天气如何?

需要调用工具

模型输出:

```
Action:
```
{
  "action": "Final Answer",
  "action_input": "广州现在的天气是多云,温度为87华氏度,降水概率为7%,湿度为76%,风力为7英里/小时。"
}
```

PS:

1. 上面仅用于演示流式输出的效果,里面包含一些冗余的信息,例如:"action": "Final Answer",要根据实际情况过滤。

2. 页面输出的样式可以根据实际需要进行调整,此处仅用于演示效果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/765748.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

动手学深度学习 --带你了解chatgpt,跟上AI发展!

本书旨在向读者交付有关深度学习的交互式学习体验。书中不仅阐述深度学习的算法原理&#xff0c;还演示它们的实现和运行。与传统图书不同&#xff0c;本书的每一节都是一个可以下载并运行的 Jupyter记事本&#xff0c;它将文字、公式、图像、代码和运行结果结合在了一起。此外…

【JS】纯web端使用ffmpeg实现的视频编辑器-视频合并

纯前端实现的视频合并 接上篇ffmpeg文章 【JS】纯web端使用ffmpeg实现的视频编辑器 这次主要添加了一个函数&#xff0c;实现了视频合并的操作。 static mergeArgs(timelineList) {const cmd []console.log(时间轴数据,timelineList)console.log("文件1",this.readD…

openGauss真的比PostgreSQL差了10年?

前不久写了MogDB针对PostgreSQL的兼容性文章&#xff0c;我在文中提到针对PostgreSQL而言&#xff0c;MogDB兼容性还是不错的&#xff0c;其中也给出了其中一个能源客户之前POC的迁移报告数据。 But很快我发现总有人回留言喷我&#xff0c;而且我发现每次喷的这帮人是根本不看文…

容器内存

一、容器内存概述 容器本质上还是一个进程&#xff0c;是一个被隔离和限制的进程。因此容器内存和进程内存在表现形式上其实是一样的&#xff0c;这块主要涉及三部分内容&#xff1a;RSS&#xff0c;page cache和swap这三部分&#xff0c;容器基于memory Cgroup对内存进行限制…

Xorbits inference操作实战

1.操作环境 序号软件版本备注1Windows1.版本&#xff1a;Windows 10 专业版2.版本号&#xff1a;21H23.操作系统内部版本&#xff1a;19044.18892Docker Desktop4.24.2 (124339)3WSLUbuntu 22.04 LTS4Python3.105CUDA12.16Dify0.6.6 Xorbits inference 是一个强大且通用的分布…

Python基础001

Python输出语句 print输出字符串 print("中国四大名著&#xff1a;","西游记|","三国演义|","红楼梦|","水浒传") print(6) print(1 1)Python输入语句 input函数 input() input("我的名字是&#xff1a;") p…

在非 antd pro 项目中使用 umi OpenAPI

大家好&#xff0c;我是松柏。自从跟着鱼皮哥使用了ant design pro中的OpenAPI插件之后&#xff0c;我已经无法忍受自己写请求后端接口的方法了&#xff0c;所以这篇文章记录一下如何在非ant design pro项目中使用OpenAPI。 安装依赖 首先我们需要安装包umijs/openapi&#x…

java面试课程-SpringIOC部分源码解析

1.SpringIOC的refresh源码解析 核心&#xff1a; 核心使用的是&#xff1a; 需要完成配置类的解析&#xff0c;各种BeanFactoryProcessor的注册。还有写国际化配置的初始化。Web容器的内部构造。 上面几个方法是refresh方法的内容。注意可以与applicationContext里的内容一起…

Profibus DP主站转Modbus网关连接智能化电表通讯

Profibus DP主站转Modbus网关&#xff08;XD-MDPBM20&#xff09;&#xff0c;是实现不同工业通信协议之间互联互通的设备&#xff0c;主要将Profibus DP协议转换为Modbus协议&#xff0c;实现数据的双向传输。通过Profibus DP主站转Modbus网关&#xff08;XD-MDPBM20&#xff…

大Excel表格76M,电脑16G内存打不开,内存利用率100%虚拟内存占用16G还是卡死提示内存不足,如何才能查看里面内容?

环境: Excel2016 问题描述: 大Excel表格76M,电脑16G内存打不开,内存利用率100%虚拟内存占用16G还是卡死提示内存不足,如何才能查看里面内容? 解决方案: 遇到这种情况,说明Excel文件非常大,超出了你当前计算机配置的处理能力。以下是一些解决方法,帮助你尝试打开或…

代码随想录-Day45

198. 打家劫舍 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻的房屋在同一晚上被小偷闯入&#xff0c;系统会自动报警。 给定一个代表每个…

编译原理2

推导和短语 推导 推导过程中&#xff0c;每一步推导都是对句型的 最右非终结符 进行替换&#xff0c;最右推导(规范推导)&#xff1b; 短语 用 β 替换 A&#xff0c;则 β 就是 关于A 的一个短语&#xff1b; 直接短语是短语范围内的一步推导&#xff1b; 直接短语可能不…

基于python的随机森林回归预测+贝叶斯优化超参数前后训练效果对比

目录 1.导入必要的库 2.导入数据与数据预处理 3.查看数据分布 4.特征选择 5.模型建立与训练 6.训练集预测结果 7.模型评估 8.预测新数据 9.贝叶斯优化超参数 1.导入必要的库 # 导入所需的库 from sklearn.model_selection import cross_val_score import pandas as …

Sentinel实现区分来源

要区分来源就要写代码实现RequestOriginParser接口 &#xff0c;注意是要实现com.alibaba.csp.sentinel.adapter.servlet.callback.RequestOriginParser 接口&#xff0c;别搞错接口了。 MyRequestOriginParser.java package com.codex.terry.sentinel.origin;import com.ali…

STM32mp157aaa按键中断实验

效果图&#xff1a; 源码&#xff1a; #include "key.h" void hal_key1_rcc_gpio_init() {// 使能GPIOF组RCC->MP_AHB4ENSETR | (0x1 << 5);// 设置引脚位输入模式GPIOF->MODER & (~(0X3 << 18));GPIOF->MODER & (~(0X3 << 16))…

当Matplotlib遇见SciencePlots

分享一个Matplotlib扩展工具SciencePlots&#xff0c;一行代码绘制science、nature、ieee等要求的图形。 安装 安装SciencePlots # 直接从PyPI安装 pip install SciencePlots 安装latex 如果latex未安装&#xff0c;会报错&#xff1a;RuntimeError: Failed to process st…

【QT开发】乒乓球碰撞反弹demo

在编写代码时&#xff0c;无意弄出来了一个这个东西&#xff0c;觉得挺有意思的记录一下&#xff0c;类似乒乓球在矩形内一直运动碰撞反弹demo 头文件 #ifndef MYPROJECT_H #define MYPROJECT_H#include <QMainWindow> #include <QPainter> #include "form.…

【区块链+基础设施】国家健康医疗大数据科创平台 | FISCO BCOS应用案例

在医疗领域&#xff0c;疾病数据合法合规共享是亟待解决的难题。一方面&#xff0c;当一家医院对患者实施治疗后&#xff0c;若患者转到其 他医院就医&#xff0c;该医院就无法判断诊疗手段是否有效。另一方面&#xff0c;医疗数据属于个人敏感数据&#xff0c;一旦被泄露或被恶…

前端开发中的常见问题及解决方法

前端开发是一个充满挑战和乐趣的领域。然而&#xff0c;在开发过程中&#xff0c;开发者常常会遇到各种各样的问题。本文将介绍一些前端开发中常用或者经常遇到的问题&#xff0c;并提供相应的解决方法&#xff0c;帮助你提高开发效率和解决问题的能力。 一. 页面布局问题 问题…

ArcTs布局入门03——层叠布局(Stack)

如果你也对鸿蒙开发感兴趣&#xff0c;加入“Harmony自习室”吧&#xff01; 扫描下面的二维码关注公众号。 1、概述 叠布局&#xff08;StackLayout&#xff09;用于在屏幕上预留一块区域来显示组件中的元素&#xff0c;提供元素可以重叠的布局。层叠布局通过Stack容器组件实…