8.1 KiB
RSS Feed Bot Posting to Matrix.org
This is the sibling of Feediverse, it's a small Matrix.org client which uses Python's feedparser
library to post RSS and Atom feeds to a Matrix room. I used to run this myself and a few years ago I moved to Matrix's hosted "Neb" Bot solution since it could be configured by others. Well, they went and goofed it up and rather than make every feed I care about adhere 100% to a brittle parser, we'll go back to a known-working solution.
This is a Literate Programming version of my old matrix-feedbot with some features added to it like being able to load some feeds from the feeds.json
endpoint in the Arcology Routing Logic/Arroyo Feed Cache Generator.
Nix Shell setup
{ pkgs ? import <nixpkgs> {} }:
let
myPython = pkgs.python3.withPackages (ppkgs:
with ppkgs; [
feedparser
click
pyyaml
matrix-nio
]);
in
pkgs.mkShell {
packages = [
pkgs.poetry
myPython
];
}
NEXT I'll convert myPython
to a poetry2nix
later on.
pyproject/poetry definition
I'll be a fool and use poetry for this even though I don't really need to.
[tool.poetry]
name = "matrix-feedbot"
version = "0.0.0"
description = "Send RSS feeds to a Matrix room"
license = "GPL-3.0-only"
authors = [
"Ryan Rix <code@whatthefuck.computer"
]
packages = [
{ include = "matrix_feedbot" }
]
[tool.poetry.dependencies]
python = "^3.10"
matrix-nio = "^0.20.2"
feedparser = "^6.0.10"
pyyaml = "^6.0"
NEXT In theory this could support e2e but lol, it's fine. Maybe later.
The Script Itself
import asyncio
import aiohttp
import yaml
import feedparser
import os
from typing import Dict
import datetime
import time # i hate python!!
from nio import AsyncClient, MatrixRoom, RoomMessageText
def stub_config(path: str) -> Dict:
cfg = dict(
credentials=dict(
username=input("Enter the Matrix username: "),
password=input("Enter the Matrix password: "),
homeserver=input("Enter the Matrix homserver URL: "),
),
rooms=[
input("Enter the first room ID to join (starts with ! not #): "),
],
feeds=[],
dynamic_urls=[],
)
with open(path, 'w') as f:
yaml.dump(cfg, f)
return cfg
def load_config() -> Dict:
path = os.environ.get("FEEDBOT_CONFIG", "./cfg.yaml")
if not os.path.exists(path):
cfg = stub_config(path)
else:
with open(path, 'r') as f:
cfg = yaml.safe_load(f)
return cfg
class Worker():
def __init__(self):
self.config = load_config()
self.client = AsyncClient(self.config["credentials"]["homeserver"],
self.config["credentials"]["username"])
self.cache = self.load_cache()
self.last_fetch = datetime.datetime.fromtimestamp(0)
# make async
def persist_cache(self):
# hoist
path = os.environ.get("FEEDBOT_CACHE", "./cache.yaml")
with open(path, 'w') as f:
return yaml.safe_dump(self.cache, f)
def load_cache(self):
path = os.environ.get("FEEDBOT_CACHE", "./cache.yaml")
cache = { feed: set() for feed in self.config["feeds"] }
if os.path.exists(path):
with open(path, 'r') as f:
cache = {**cache, **yaml.safe_load(f)}
return cache
async def message_callback(self, room: MatrixRoom, event: RoomMessageText) -> None:
pass
async def send_message(self, feed, entry):
# construct message
html = ''.join([
'New in <a href="', feed['feed']['link'], '">',
feed['feed']['title'],
'</a>: <a href="', entry['link'], '">', entry['title'], '</a>'
])
text = ''.join([
'New in', feed['feed']['title'],
': ', entry['link'], " - ", entry['title']
])
print(text)
print(html)
# TKTKTK configure which rooms a feed goes tooo.
# TKTKTK configure how often to scrape a feed
for room_id in self.config['rooms']:
print(await self.client.room_send(
room_id=room_id,
message_type="m.room.message",
content={"msgtype": "m.text",
"format": "org.matrix.custom.html",
"formatted_body": html,
"body": text },
))
async def fetch(self, session, url):
async with session.get(url) as response:
return await response.text()
async def maybe_fetch_feeds(self):
"""
greetz to https://stackoverflow.com/questions/23847555/asynchronous-feedparser-requests
"""
feedurls = self.config["feeds"]
for feed in feedurls:
await self.fetch_feed_url(feed)
async def maybe_fetch_dynamic_feeds(self):
for feed_config in self.config['dynamic_urls']:
async with aiohttp.ClientSession() as session:
data = await self.fetch(session, feed_config)
data = yaml.safe_load(data) # it's json and json is yaml so hahaha
for feed in data:
self.cache[feed['url']] = self.cache.get(feed['url'], set())
await self.fetch_feed_url(feed['url'])
async def fetch_feed_url(self, feed):
async with aiohttp.ClientSession() as session:
data = await self.fetch(session, feed)
rss = feedparser.parse(data)
print(f"got {feed} w/ {len(rss['entries'])}")
def filter_old_entries(entry):
if entry['updated_parsed'].tm_year > 1969:
entry_time = datetime.datetime.fromtimestamp(time.mktime(entry['updated_parsed']))
if datetime.datetime.now() - entry_time < datetime.timedelta(hours=24) and entry['link'] not in self.cache[feed]:
return True
else:
return False
else:
return False
entries_filtered = filter(filter_old_entries, rss['entries'])
entries_sorted = sorted(entries_filtered, key=lambda entry: entry['updated_parsed'])
for entry in entries_sorted:
await self.send_message(rss, entry)
self.cache[feed].add(str(entry['link']))
async def main(self) -> None:
self.client.add_event_callback(self.message_callback, RoomMessageText)
print(await self.client.login(self.config["credentials"]["password"]))
for room_id in self.config["rooms"]:
await self.client.join(room_id)
while True:
td = datetime.datetime.now() - self.last_fetch
if td > datetime.timedelta(hours=1):
print("refreshing...")
await self.maybe_fetch_feeds()
await self.maybe_fetch_dynamic_feeds()
self.persist_cache()
self.last_fetch = datetime.datetime.now()
print("ahh...")
await self.client.sync(timeout=30*1000)
w = Worker()
asyncio.get_event_loop().run_until_complete(w.main())
NEXT detangle
NEXT feed -> roomlist mappings? so that my dev feeds can also go to my devlog rooms
NEXT commands in message_callback
INPROGRESS tag feeds based on fetch recency rather than using a sleep so that the commands don't block
- State "INPROGRESS" from "NEXT" [2023-04-01 Sat 00:23]