以下是根据需求编写的Python代码,用于调用和风天气API获取临泉天气并通过企业微信Webhook推送:

import requests

QWEATHER_API_KEY = "你的和风天气API_KEY"  # 替换为你的Key
WEBHOOK_URL = "你的企业微信群机器人Webhook_URL"  # 替换为Webhook地址

# 1. 获取临泉的LocationID
def get_linquan_id():
    geo_url = "https://geoapi.qweather.com/v2/city/lookup"
    params = {
        "location": "临泉",
        "key": QWEATHER_API_KEY,
        "adm": "安徽",
        "mode": "exact"
    }
    try:
        response = requests.get(geo_url, params=params)
        response.raise_for_status()
        res = response.json()
        print("地理API响应:", res)
        if res.get("code") != "200":
            raise Exception(f"API错误: {res.get('message', '未知错误')}")
        if not res.get("location"):
            raise Exception("未找到临泉地点信息")
        return res["location"][0]["id"]
    except requests.exceptions.RequestException as e:
        raise Exception(f"请求失败: {str(e)}")

# 2. 获取实时天气
def get_realtime_weather(location_id):
    weather_url = "https://devapi.qweather.com/v7/weather/now"
    params = {
        "location": location_id,
        "key": QWEATHER_API_KEY
    }
    try:
        response = requests.get(weather_url, params=params)
        response.raise_for_status()
        res = response.json()
        print("天气API响应:", res)
        if res.get("code") != "200":
            raise Exception(f"API错误: {res.get('message', '未知错误')}")
        return res["now"]
    except requests.exceptions.RequestException as e:
        raise Exception(f"请求失败: {str(e)}")

# 3. 推送消息到企业微信(确保此函数存在且缩进正确!)
def push_to_wecom(content):
    headers = {"Content-Type": "application/json"}
    data = {
        "msgtype": "markdown",
        "markdown": {
            "content": content
        }
    }
    res = requests.post(WEBHOOK_URL, headers=headers, json=data)
    if res.json().get("errcode") == 0:
        print("推送成功")
    else:
        print(f"推送失败: {res.text}")

# 主流程(必须在函数定义之后调用)
if __name__ == "__main__":
    try:
        city_id = get_linquan_id()
        weather = get_realtime_weather(city_id)

        msg = f"""**临泉实时天气**  
        >🌡️温度:{weather["temp"]}℃  
        >🌤️体感:{weather["feelsLike"]}℃  
        >🌈天气:{weather["text"]}  
        >💨风力:{weather["windScale"]}级({weather["windDir"]})  
        >💧湿度:{weather["humidity"]}%  
        [查看更多](https://www.qweather.com)"""

        push_to_wecom(msg)  # 调用推送函数

    except Exception as e:
        print(f"执行失败: {str(e)}")

以下是根据需求编写的Python代码,用于调用和风天气API获取嘉定天气并通过企业微信Webhook推送:

import requests

QWEATHER_API_KEY = "你的和风天气API_KEY"  # 替换为你的Key
WEBHOOK_URL = "你的企业微信群机器人Webhook_URL"  # 替换为Webhook地址

# 1. 获取嘉定区的LocationID
def get_location_id():
    geo_url = "https://geoapi.qweather.com/v2/city/lookup"
    params = {
        "location": "嘉定区",
        "adm": "上海",  # 指定上级行政区为上海
        "key": QWEATHER_API_KEY,
        "mode": "exact"
    }
    try:
        response = requests.get(geo_url, params=params)
        response.raise_for_status()
        res = response.json()
        print("地理API响应:", res)
        if res.get("code") != "200":
            raise Exception(f"API错误: {res.get('message', '未知错误')}")
        if not res.get("location"):
            raise Exception("未找到地点信息")
        return res["location"][0]["id"]
    except requests.exceptions.RequestException as e:
        raise Exception(f"请求失败: {str(e)}")

# 2. 获取实时天气(函数无需修改)
def get_realtime_weather(location_id):
    weather_url = "https://devapi.qweather.com/v7/weather/now"
    params = {
        "location": location_id,
        "key": QWEATHER_API_KEY
    }
    try:
        response = requests.get(weather_url, params=params)
        response.raise_for_status()
        res = response.json()
        print("天气API响应:", res)
        if res.get("code") != "200":
            raise Exception(f"API错误: {res.get('message', '未知错误')}")
        return res["now"]
    except requests.exceptions.RequestException as e:
        raise Exception(f"请求失败: {str(e)}")

# 3. 推送消息到企业微信(无需修改)
def push_to_wecom(content):
    headers = {"Content-Type": "application/json"}
    data = {
        "msgtype": "markdown",
        "markdown": {
            "content": content
        }
    }
    res = requests.post(WEBHOOK_URL, headers=headers, json=data)
    if res.json().get("errcode") == 0:
        print("推送成功")
    else:
        print(f"推送失败: {res.text}")

# 主流程(修改消息标题)
if __name__ == "__main__":
    try:
        city_id = get_location_id()  # 函数名称同步修改
        weather = get_realtime_weather(city_id)

        # 构造消息内容(更新标题为嘉定区)
        msg = f"""**上海市嘉定区实时天气**  
        >🌡️温度:{weather["temp"]}℃  
        >🌤️体感:{weather["feelsLike"]}℃  
        >🌈天气:{weather["text"]}  
        >💨风力:{weather["windScale"]}级({weather["windDir"]})  
        >💧湿度:{weather["humidity"]}%  
        [查看更多](https://www.qweather.com)"""

        push_to_wecom(msg)

    except Exception as e:
        print(f"执行失败: {str(e)}")

使用说明:

  1. 获取必要参数

    • 和风天气API Key:注册和风天气,在控制台创建项目获取API Key
    • 企业微信Webhook:
      • 在企业微信客户端右键群聊 -> 添加机器人
      • 创建后复制Webhook URL(包含key=参数)
  2. 安装依赖

    pip install requests
  3. 代码修改

    • 将代码中的你的和风天气API_KEY替换为你的和风天气API Key
    • 你的企业微信群机器人Webhook_URL替换为企业微信机器人Webhook地址
  4. 扩展功能建议

    • 添加定时任务(通过cron或Windows计划任务)
    • 增加异常重试机制
    • 添加更多天气信息(如未来预报)
    • 使用环境变量管理敏感信息
作者:君君  创建时间:2025-03-09 20:46
最后编辑:君君  更新时间:2025-03-11 10:32