AI摘要:最近发现了一个方便自己部署的基于QQNT的QQ机器人项目。使用NONENOT的框架兼容go-cqhttp这个项目。具体的使用步骤包括下载安装llob_install.exe和最新版本的QQ,然后重启QQ。接着按照文档安装NONEBOT,并创建项目。最后设置反向WS和插件,即可运行机器人并使用命令将消息转发到Memos。
最近发现了一个方便自己部署的基于QQNT的QQ机器人项目.
于是有了该文章
项目地址
https://github.com/super1207/install_llob/releases
使用步骤
- 下载llob_install.exe
- 下载最新版本的QQ
- 安装llob_install.exe,重启QQ
参照文档安装NONEBOT
需要准备
Python3.11以上版本
安装脚手架
安装 pipx
python -m pip install --user pipx
python -m pipx ensurepath
安装脚手架
pipx install nb-cli
创建项目
nb create
按照提示操作
- 项目模板[?] 选择一个要使用的模板: bootstrap (初学者或用户)
- 项目名称[?] 项目名称: memos
- 其他选项 请注意,多选项使用空格选中或取消,回车确认。 [?] 要使用哪些驱动器? FastAPI (FastAPI 驱动器)
[?] 要使用哪些适配器? Console (基于终端的交互式适配器)
[?] 立即安装依赖? (Y/n) Yes
[?] 创建虚拟环境? (Y/n) Yes - 选择内置插件[?] 要使用哪些内置插件? echo
运行项目
在项目创建完成后,你可以在项目目录中使用以下命令来运行项目:
nb run
设置反向WS
ws://127.0.0.1:8080/onebot/v11/ws
保存.即可
此处参照文档 https://llonebot.github.io/zh-CN/guide/configuration
插件
给QQ机器人增加发送到memos 的功能
在nonebot的项目memos下
新建bot.py
内容为
import nonebot
from nonebot.adapters.onebot.v11 import Adapter as ONEBOT_V11Adapter
# 初始化 NoneBot
nonebot.init()
# 注册适配器
driver = nonebot.get_driver()
driver.register_adapter(ONEBOT_V11Adapter)
# 在这里加载插件
nonebot.load_builtin_plugins("echo") # 加载内置插件
nonebot.load_from_toml("pyproject.toml") # 从 toml 文件加载插件
# 如果有额外的插件目录,可以这样加载
# nonebot.load_plugins("src/plugins")
if __name__ == "__main__":
nonebot.run()
在pyproject.toml
中加入
plugins = []
plugin_dirs = ["memos/plugins"]
在项目目录下创建memos/plugins文件夹,在其下创建qq_to_memos.py
内容为
from nonebot import on_command, on_message, get_driver
from nonebot.rule import to_me
from nonebot.adapters.onebot.v11 import Bot, Event, Message
from nonebot.params import CommandArg
import json
import os
import httpx
from typing import Dict, Any
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='memos_bot.log',
filemode='a'
)
logger = logging.getLogger(__name__)
# 文件路径
JSON_FILE = "users_data.json"
# 读取 JSON 数据
def read_json() -> Dict[str, Any]:
if os.path.exists(JSON_FILE):
with open(JSON_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
# 写入 JSON 数据
def write_json(data: Dict[str, Any]):
with open(JSON_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
# 初始化函数
async def init():
if not os.path.exists(JSON_FILE):
write_json({})
logger.info(f"Created new JSON file: {JSON_FILE}")
# 注册命令
start = on_command("start", rule=to_me(), priority=5)
@start.handle()
async def handle_start(bot: Bot, event: Event, args: Message = CommandArg()):
user_id = event.get_user_id()
token = args.extract_plain_text().strip()
if not token:
await start.finish("请提供 Token,格式:/start <token>")
logger.warning(f"User {user_id} failed to start due to missing token")
return
users_data = read_json()
users_data[user_id] = {"token": token}
write_json(users_data)
logger.info(f"User {user_id} started successfully")
await start.finish("绑定成功!现在您可以直接发送消息,我会将其保存到 Memos。")
# 处理所有消息
memo = on_message(priority=5)
@memo.handle()
async def handle_memo(bot: Bot, event: Event):
user_id = event.get_user_id()
message = event.get_message()
users_data = read_json()
user_info = users_data.get(user_id)
if not user_info:
await memo.finish("您还未绑定,请先使用 /start <token> 命令绑定。")
logger.warning(f"Unstarted user {user_id} attempted to send a memo")
return
token = user_info["token"]
text_content = message.extract_plain_text()
# 如果消息为空,不处理
if not text_content.strip():
return
# 发送到 Memos
async with httpx.AsyncClient() as client:
try:
payload = {
"content": text_content.strip(),
"visibility": "PUBLIC"
}
response = await client.post(
"https://memos.ee/api/v1/memos",
json=payload,
headers={"Authorization": f"Bearer {token}"}
)
response.raise_for_status()
logger.info(f"Memo sent successfully for user {user_id}")
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred for user {user_id}: {e}")
logger.error(f"Response content: {e.response.text}")
await memo.finish(f"发送失败,错误代码:{e.response.status_code},请检查您的 Token 和网络连接。")
return
except Exception as e:
logger.error(f"Unexpected error occurred for user {user_id}: {e}")
await memo.finish("发送过程中发生意外错误,请稍后重试。")
return
await memo.finish("已成功发送到 Memos!")
# 获取驱动器并注册启动事件
driver = get_driver()
driver.on_startup(init)
logger.info("Memos bot plugin initialized")
API端点按自己需求更改即可.
运行机器人
nb run
此时QQ机器人已经正常启动了.
使用机器人
在聊天界面 使用命令
/start token
显示绑定成功
此时发送消息即可转发到memos.且默认为公开的memos.
如需默认为其他状态 需修改 "visibility" 的 值 .
成功演示 https://memos.ee/m/oEBkwymeQTb6fRUeEcA9m5
如果你想尝试一下此功能可以添加QQ机器人
122790336
需添加为好友且 在 https://memos.ee 注册获取token才可以使用.
评论 (0)