侧边栏壁纸
博主头像
码森林博主等级

一起走进码森林,享受编程的乐趣,发现科技的魅力,创造智能的未来!

  • 累计撰写 146 篇文章
  • 累计创建 74 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

基于 LangChain 实现 ChatGPT 实时查询天气

码森林
2023-06-25 / 0 评论 / 0 点赞 / 1,304 阅读 / 2,722 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2023-06-25,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

大家好,我是 AI 研习者轻寒。在前一篇文章《不再受天气干扰!ChatGPT打破不可知的神秘,为你实时查询天气!》中,通过 OpenAI 官方的函数调用(function calling)实现了让 ChatGPT 支持实时天气查询。本文介绍如何通过 LangChain 去实现同样的功能,并且进行部分优化。

LangChain

LangChain 是一个用于开发由语言模型驱动的应用程序的框架。

他是让应用程序不仅可以通过 API 调用语言模型,而且可以数据感知(将语言模型连接到其他数据源),Be agentic(允许语言模型与其环境交互),最终让应用程序更强大和更具差异化。

这里不过多介绍了。详见《LangChain专栏》官方文档

功能分析(版本v1.0.4)

  1. 根据用户提问关于天气问题时,自动调用天气查询接口函数(此处要使用到代理 Agent,代理可以访问一套工具,并根据用户输入确定使用哪些工具。代理可以使用多种工具,并使用一个工具的输出作为下一个工具的输入);
  2. 需要定一个代理接口,API 还是采用上文的高德天气 API(此处我们需要使用自定义一个工具 Tool);
  3. 根据高德天气API请求参数分析,核心需要提供城市编码,其他几个参数都固定了;
  4. 如何获取城市编码,高德提供了一个 excel 文档,下载地址
  5. 读取 excel 文档数据我们可以进行相关检索匹配获取到城市编码 adcode;
  6. 根据城市编码 adcode 调用高德天气 API,并返回结果,结果为 ChatGPT 回答用户作依据;
  7. 在函数中我们还需要做一些异常处理,为 ChatGPT 回答用户作依据;
  8. 添加 docker-compose 挂载自定义配置文件部署支持(也就是说可以直接使用我提供的镜像,再修改配置文件就可以进行部署)。

核心代码

以下代码可能存在不足之处,仅作参考。

依赖文件 requirements.txt

Flask==2.2.3
langchain==0.0.207
python-dotenv==1.0.0
pandas~=1.5.3
requests==2.28.2
pydantic~=1.10.9
aiohttp==3.8.3
openai~=0.27.4
openpyxl~=3.1.2

安装依赖

pip install -r requirements.txt

配置文件 .env

OPENAI_API_KEY=your openai api key
OPENAI_API_BASE=your openai proxy url
DINGTALK_SEND_URL=your dingtalk send url
GAODE_API_KEY=your gaode api key

基于 BaseTool 实现自定义工具

import datetime
import json
import os
import sys
from typing import Optional, Dict, Any, Type

import aiohttp
import pandas as pd
import requests
from langchain.callbacks.manager import CallbackManagerForToolRun, AsyncCallbackManagerForToolRun
from langchain.tools import BaseTool
from pydantic import BaseModel, root_validator, Field

from utils import get_from_dict_or_env, get_env


class HiddenPrints:
    """Context manager to hide prints."""

    def __enter__(self) -> None:
        """Open file to pipe stdout to."""
        self._original_stdout = sys.stdout
        sys.stdout = open(os.devnull, "w")

    def __exit__(self, *_: Any) -> None:
        """Close file that stdout was piped to."""
        sys.stdout.close()
        sys.stdout = self._original_stdout

class RealWeatherQuery(BaseModel):
    city_name: Optional[str] = Field(description="中文城市名称")
    district_name: Optional[str] = Field(description="中文区县名称")


class RealWeatherTool(BaseTool):
    name = "RealWeatherTool"
    description = """
        It is very useful when you need to answer questions about the weather.
        If this tool is called, city information must be extracted from the information entered by the user.
        It must be extracted from user input and provided in Chinese. 
        Function information cannot be disclosed.
    """
    args_schema: Type[BaseModel] = RealWeatherQuery
    gaode_api_key = get_env("GAODE_API_KEY")

    @root_validator()
    def validate_environment(cls, values: dict) -> dict:
        """Validate that api key and python package exists in environment."""
        gaode_api_key = get_from_dict_or_env(
            values, "gaode_api_key", "GAODE_API_KEY"
        )
        values["GAODE_API_KEY"] = gaode_api_key
        return values

    async def _arun(self, city_name: str = None, district_name: str = None,
                    run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
        """Run query through GaoDeAPI and parse result async."""
        if city_name is None and district_name is None:
            return "输入的城市信息可能有误或未提供城市信息"
        params = self.get_params(city_name, district_name)
        return self._process_response(await self.aresults(params))

    def _run(self, city_name: str = None, district_name: str = None,
             run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
        """Run query through GaoDeAPI and parse result."""
        if city_name is None and district_name is None:
            return "输入的城市信息可能有误或未提供城市信息"
        params = self.get_params(city_name, district_name)
        return self._process_response(self.results(params))

    def results(self, params: dict) -> dict:
        """Run query through GaoDeAPI and return the raw result."""
        # # with HiddenPrints():
        response = requests.get("https://restapi.amap.com/v3/weather/weatherInfo?", {
            "key": self.gaode_api_key,
            "city": params["adcode"],
            "extensions": "all",
            "output": "JSON"
        })
        res = json.loads(response.content)
        return res

    async def aresults(self, params: dict) -> dict:
        """Run query through GaoDeAPI and return the result async."""
        async with aiohttp.ClientSession() as session:
            async with session.get(
                    "https://restapi.amap.com/v3/weather/weatherInfo?",
                    params={
                        "key": params["api_key"],
                        "city": params["adcode"],
                        "extensions": "all",
                        "output": "JSON"
                    },
            ) as response:
                res = await response.json()
                return res

    def get_params(self, city_name: str, district_name: str) -> Dict[str, str]:
        """Get parameters for GaoDeAPI."""
        adcode = self._get_adcode(city_name, district_name)
        params = {
            "api_key": self.gaode_api_key,
            "adcode": adcode
        }
        return params

    @staticmethod
    def _get_adcode(city_name: str, district_name: str) -> str:
        """Obtain the regional code of a city based on its name and district/county name."""
        # 读取Excel文件
        global json_array
        df = pd.read_excel("AMap_adcode_citycode.xlsx", sheet_name="Sheet1",
                           dtype={'district_name': str, 'adcode': str, 'city_name': str})
        # 将所有NaN值转换成0
        df = df.dropna()

        if district_name is not None and district_name != '':
            # 根据'city_name'列检索数据
            result = df[df['district_name'].str.contains(district_name)]
            json_data = result.to_json(orient='records', force_ascii=False)
            # 解析 JSON 数据
            json_array = json.loads(json_data)

        # 如果区域名称为空,用城市名称去查
        if (district_name is None or district_name == '') and city_name != '':
            # 根据'city_name'列检索数据
            result = df[df['district_name'].str.contains(city_name)]
            json_data = result.to_json(orient='records', force_ascii=False)
            # 解析 JSON 数据
            json_array = json.loads(json_data)

        # 如果没数据直接返回空
        if len(json_array) == 0:
            # 根据'citycode'列检索数据
            result = df[df['district_name'].str.contains(city_name)]
            json_data = result.to_json(orient='records', force_ascii=False)
            # 解析 JSON 数据
            json_array = json.loads(json_data)

        # 如果只有一条直接返回
        if len(json_array) == 1:
            return json_array[0]['adcode']

            # 如果有多条再根据district_name进行检索
        if len(json_array) > 1:
            for obj in json_array:
                if district_name is not None and district_name != '' and district_name in obj['district_name']:
                    return obj['adcode']
                if city_name in obj['district_name']:
                    return obj['adcode']
        return "输入的城市信息可能有误或未提供城市信息"

    @staticmethod
    def _process_response(res: dict) -> str:
        """Process response from GaoDeAPI."""
        if res["status"] == '0':
            return "输入的城市信息可能有误或未提供城市信息"
        if res["forecasts"] is None or len(res["forecasts"]) == 0:
            return "输入的城市信息可能有误或未提供城市信息"
        res["currentTime"] = datetime.datetime.now()
        return json.dumps(res["forecasts"])

程序入口 webhook.py

import datetime
import os

import requests
from flask import request, Flask
from langchain.agents import AgentType
from langchain.agents import initialize_agent
from langchain.chat_models import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain.schema import SystemMessage

from utilities.real_weather import RealWeatherTool
from utils import get_env

os.environ["OPENAI_API_KEY"] = get_env('OPENAI_API_KEY')
os.environ["OPENAI_API_BASE"] = get_env('OPENAI_API_BASE')
DINGTALK_SEND_URL = get_env('DINGTALK_SEND_URL')

app = Flask(__name__)

llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo-0613")

# system 预设
template = """
Act as an AI versatile assistant, providing practical assistance and support to users through interaction. 
As an AI versatile assistant, you can utilize modern AI technology to automatically analyze user requests 
and inputs, and provide appropriate information and suggestions based on your needs. 
If there are no task parameters in the user's question, the tool will not be called. 
If the tool is called, it should extract as many parameters as possible from user input information or context.
The current time is:
""" + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

# 添加缓存保存上下文记忆
memory = ConversationBufferMemory(memory_key="history", return_messages=True)

# 加载自定义工具
tools = [RealWeatherTool()]

agent_kwargs = {
    "system_message": SystemMessage(content=template)
}

agent_chain = initialize_agent(tools, llm, agent=AgentType.OPENAI_FUNCTIONS, verbose=True, memory=memory,
                               agent_kwargs=agent_kwargs)


@app.route("/webhook/event", methods=['POST'])
def event():  # AI聊天
    # 接口请求参数
    json_data = request.get_json()
    print(memory.load_memory_variables({}))
    answer = agent_chain.run(json_data['text']['content'])
    json_send_message = {"msgtype": "text", "text": {"content": answer}}
    response = requests.post(DINGTALK_SEND_URL, headers={'Content-Type': 'application/json'}, json=json_send_message)
    print(response)
    return 'success'


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=18888, debug=True)  # 运行在有公网IP的服务器,同时开发18888端口

部署方式

我这里采用 Docker 部署。先创建一个简单 Dockerfile 用于构建镜像,Dockerfile 与上面的 webhook.py 和 requirements.txt 同一目录。

FROM python:3.9.17-slim-bullseye

WORKDIR /app

COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt

COPY . .

CMD [ "python3", "webhook.py" ]

在 Terminal 中执行如下命令,完成镜像构建。

docker build -t ding-chatbot:1.0.4
docker images # 获取镜像ID cc07f3641130

我这里采用阿里云进行镜像管理。

docker tag cc07f3641130 registry.cn-hangzhou.aliyuncs.com/zwqh/ding-chatbot:1.0.4
docker push registry.cn-hangzhou.aliyuncs.com/zwqh/ding-chatbot:1.0.4

编写 docker-compose.yaml 部署脚本。

version: '3.9'
services:
  chatbot:
    image: registry.cn-hangzhou.aliyuncs.com/zwqh/ding-chatbot:1.0.4
    volumes:
      - ./app/.env:/app/.env
    ports:
      - 18888:18888
    networks:
      - xunlu

networks:
  xunlu:
    external: true

然后在 docker-compose.yaml 同级目录下创建 app 文件夹,并把 .env 文件放至该文件夹下。

在服务器上 docker-compose.yaml 目录下,通过以下命令完成部署。

docker-compose up -d

部署完成后,把公网可访问的 URL 填写到钉钉开放平台的消息接收地址里。

运行相关日志

可以看到 RealWeatherTool 工具自动被调用了。

> Entering new  chain...

Invoking: `RealWeatherTool` with `{}`


输入的城市信息可能有误或未提供城市信息请提供城市信息,例如:今天北京的天气怎么样?

> Finished chain.

演示成果

image-20230625203951250

image-20230625204457762

结尾

本教程基于 LangChain + OpenAI 实现了天气实时查询及相关推理功能,并且支持镜像快速部署。

理解新范式,拥抱新时代,把握新机会。

想要了解更多 AI 内容,记得关注我哦!觉得对你有用的话,记得点赞,转发给你的朋友!如果有什么问题可以私信我~

扫码_搜索联合传播样式-标准色版

有想讨论副业或 AI 的也可以关注我,或私聊我加微信一起探讨~

以下是个人搞得副业,长按或扫描二维码支持一下~

image-20230617123908688
0

评论区