使用python监控nodeseek-rss识别关键词进行文章推送飞书+tg
自己也在nodeseek搜了搜,发现不少人都在使用,但没找到自己需要的简单版本,于是自己弄了个。
python3 请求rss地址并解析后进行关键词检查,有提到关键词则推送,没有关键词不处理。
我设置的定时是5分钟一次,不至于错过新文章,也不会频繁访问nodeseek获取rss导致IP异常。
且每次推送的地址记录到sqlite数据库,不至于一篇文章推送多次的情况。
代码:
#!/usr/bin/env python3 import feedparser import requests import json import sqlite3 from datetime import datetime from pathlib import Path配置参数
RSS_URL = “https://www.nodeseek.com/rss.xml” KEYWORDS = [‘12.6’,‘抽奖’,‘免费’,‘free’,‘抽’,‘bagevm’,‘原生’,‘原生IP’,‘联通快乐’,‘奖’]
飞书配置
FEISHU_WEBHOOK = ‘https://open.feishu.cn/open-apis/bot/v2/hook/你的token’
Telegram 配置
TELEGRAM_BOT_TOKEN = ‘机器人token’ TELEGRAM_CHAT_ID = ‘你的TG-id’ # 例如 ‘123456789’
数据库文件路径
DB_PATH = Path(file).parent / “processed_entries.db”
def init_db(): “”“初始化数据库,创建表(如果不存在)”“” with sqlite3.connect(DB_PATH) as conn: conn.execute(“”” CREATE TABLE IF NOT EXISTS processed_entries ( link TEXT PRIMARY KEY, processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) “”“)
def is_processed(link): “”“检查文章是否已处理过”“” with sqlite3.connect(DB_PATH) as conn: cursor = conn.cursor() cursor.execute(“SELECT 1 FROM processed_entries WHERE link = ?”, (link,)) return cursor.fetchone() is not None
def mark_as_processed(link): “”“标记文章为已处理”“” with sqlite3.connect(DB_PATH) as conn: conn.execute(“INSERT OR IGNORE INTO processed_entries (link) VALUES (?)”, (link,))
def check_feeds(): try: # 1. 获取 RSS 内容(添加编码处理) response = requests.get(RSS_URL) response.encoding = ‘utf-8’ # 强制使用UTF-8解码 if response.status_code != 200: print(f”RSS请求失败: {response.status_code}“) return
# 2. 解析 XML(添加字符编码声明) feed = feedparser.parse(response.content) # 使用response.content代替text if feed.bozo: print(f"RSS解析错误: {feed.bozo_exception}") return # 3. 处理每篇文章 for entry in feed.entries: if is_processed(entry.link): continue # 跳过已处理文章 content = (entry.title + ' ' + getattr(entry, 'summary', '')).lower() matched_keywords = [kw for kw in KEYWORDS if kw.lower() in content] if matched_keywords: message = format_message(entry.title, entry.link, matched_keywords) send_to_feishu(message) # 飞书推送 send_to_telegram(message) # Telegram 推送 mark_as_processed(entry.link) # 标记为已处理 except Exception as e: print(f"处理RSS源时出错: {str(e)}")def format_message(title, link, keywords): “”“格式化推送消息”“” return f”发现关键词: 【{‘, ‘.join(keywords)}】\n\n标题: {title}\n链接: {link}”
def send_to_feishu(message): “”“飞书推送”“” payload = { “msg_type”: “text”, “content”: {“text”: message} } try: response = requests.post( FEISHU_WEBHOOK, headers={‘Content-Type’: ‘application/json’}, data=json.dumps(payload), timeout=10 ) print(“飞书已推送”) except Exception as e: print(f”飞书推送异常: {str(e)}“)
def send_to_telegram(message): “”“Telegram 推送”“” url = f”https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage” payload = { “chat_id”: TELEGRAM_CHAT_ID, “text”: message, “disable_web_page_preview”: False } try: response = requests.post( url, headers={‘Content-Type’: ‘application/json’}, data=json.dumps(payload), timeout=10 ) print(“Telegram已推送” if response.status_code == 200 else f”Telegram推送失败: {response.text}“) except Exception as e: print(f”Telegram推送异常: {str(e)}“)
if name == ‘main’: init_db() # 确保数据库已初始化 check_feeds()
执行效果:


想法:
突然想到可以给TG机器人弄个后端,然后使用命令来进行交互,比如增删关键词,比如更深入的一些访问文章屏蔽某些用户的发帖。就是不知道飞书机器人支不支持。
注意:
执行python3的时候会需要导入库,具体方法自行ai即可。正文结束
还没有评论,来坐沙发吧。