17 KiB
RSS Feed Bot Posting to Matrix.org
This is the sibling of Feediverse, it's a small Matrix.org client which uses Python's feedparser
library to post RSS and Atom feeds to a Matrix room. I used to run this myself and a few years ago I moved to Matrix's hosted "Neb" Bot solution since it could be configured by others. Well, they went and goofed it up and rather than make every feed I care about adhere 100% to a brittle parser, we'll go back to a known-working solution.
This is a Literate Programming version of my old matrix-feedbot with some features added to it like being able to load some feeds from the feeds.json
endpoint in the Arcology Routing Logic/Arroyo Feed Cache Generator.
Feeds to Post
This is embedded in the configuration.nix which deploys the feed bot… As always, I am up to Literate Programming shenanigans.
(->> tbl
(--map (first it))
(--map (format "\"%s\"" it))
(s-join "\n"))
The Script Itself
The Worker Class and Loop
Look, this thing is pretty un-exciting. It's a while
loop with a bunch of scaffolding to persist configuration, fetch feeds, and send messages. Each of those functionalities is broken out below with the core logical loop remaining in place here.
import asyncio
import aiohttp
import feedparser
import os
from typing import Dict
import datetime
import time # i hate python!!
from nio import AsyncClient, MatrixRoom, RoomMessageText
<<YamlBackedDict>>
class Worker():
def __init__(self):
path = os.environ.get("FEEDBOT_CONFIG", "./cfg.yaml")
cred_path = os.environ.get("FEEDBOT_CREDENTIALS_CONFIG", "./creds.yaml")
cache_path = os.environ.get("FEEDBOT_CACHE", "./cache.yaml")
self.config = YamlBackedDict(path, self.stub_config)
self.credentials = YamlBackedDict(cred_path, self.stub_creds)
cache = { feed: set() for feed in self.config["feeds"] }
self.cache = YamlBackedDict(cfg_path=cache_path, stub_fn=self.stub_cache)
self.cache._dict = {**cache, **self.cache._dict}
self.client = AsyncClient(self.credentials["homeserver"],
self.credentials["username"])
self.last_fetch = datetime.datetime.fromtimestamp(0)
async def main(self) -> None:
await self.login()
self.client.add_event_callback(self.message_callback, RoomMessageText)
while True:
td = datetime.datetime.now() - self.last_fetch
if td > datetime.timedelta(hours=1):
print("refreshing...")
await self.maybe_fetch_feeds()
await self.maybe_fetch_dynamic_feeds()
self.cache.save()
self.last_fetch = datetime.datetime.now()
print("ahh...")
await self.client.sync(timeout=30*1000)
<<stub-functions>>
<<message>>
<<feed-fetch>>
async def login(self) -> None:
if self.credentials.get("access_token") is None:
login_resp = await self.client.login(self.credentials["password"])
self.credentials["access_token"] = login_resp.access_token
self.credentials["device_id"] = login_resp.device_id
self.credentials.save()
else:
self.client.access_token = self.credentials["access_token"]
self.client.device_id = self.credentials["device_id"]
for room_id in self.config["rooms"]:
await self.client.join(room_id)
def run():
w = Worker()
asyncio.get_event_loop().run_until_complete(w.main())
Fetch and Parse Feeds Asynchronously
These functions, starting with fetch_feed_url
create an async
function which will handle feed parsing and whatnot all the way through sending the message. maybe_fetch_dynamic_feeds
will reach out to the Arcology's feeds.json endpoint and iterate over each of those, and maybe_fetch_feeds
will iterate over the statically defined ones.
async def maybe_fetch_feeds(self):
feedurls = self.config["feeds"]
for feed in feedurls:
await self.fetch_feed_url(feed)
async def maybe_fetch_dynamic_feeds(self):
for feed_config in self.config['dynamic_urls']:
async with aiohttp.ClientSession() as session:
data = await self.fetch(session, feed_config)
data = yaml.safe_load(data) # it's json and json is yaml so hahaha
for feed in data:
self.cache[feed['url']] = self.cache.get(feed['url'], set())
await self.fetch_feed_url(feed['url'])
async def fetch_feed_url(self, feed):
"""
greetz to https://stackoverflow.com/questions/23847555/asynchronous-feedparser-requests
"""
async with aiohttp.ClientSession() as session:
data = await self.fetch(session, feed)
rss = feedparser.parse(data)
print(f"got {feed} w/ {len(rss['entries'])}")
def filter_old_entries(entry):
if entry['updated_parsed'].tm_year > 1969:
entry_time = datetime.datetime.fromtimestamp(time.mktime(entry['updated_parsed']))
if datetime.datetime.now() - entry_time < datetime.timedelta(hours=24) and entry['link'] not in self.cache[feed]:
return True
else:
return False
else:
return False
entries_filtered = filter(filter_old_entries, rss['entries'])
entries_sorted = sorted(entries_filtered, key=lambda entry: entry['updated_parsed'])
for entry in entries_sorted:
await self.send_message(rss, entry)
self.cache[feed].add(str(entry['link']))
# if a post is made, we need to save the cache so that we don't spam our friends
self.cache.save()
async def fetch(self, session, url):
async with session.get(url) as response:
return await response.text()
Message Formatting and Sending
async def message_callback(self, room: MatrixRoom, event: RoomMessageText) -> None:
pass
async def send_message(self, feed, entry):
# construct message
html = ''.join([
'New in <a href="', feed['feed']['link'], '">',
feed['feed']['title'],
'</a>: <a href="', entry['link'], '">', entry.get('title', entry.get('link')), '</a>'
])
text = ''.join([
'New in', feed['feed']['title'],
': ', entry['link'], " - ", entry.get('title', entry.get('link'))
])
print(text)
print(html)
# TKTKTK configure which rooms a feed goes tooo.
# TKTKTK configure how often to scrape a feed
for room_id in self.config['rooms']:
print(await self.client.room_send(
room_id=room_id,
message_type="m.room.message",
content={"msgtype": "m.text",
"format": "org.matrix.custom.html",
"formatted_body": html,
"body": text },
))
YamlBackedDict
I have this stupid config structure i made which is just used to load and persist a YAML file:
import yaml
class YamlBackedDict():
def __init__(self, cfg_path: str, stub_fn):
self.path = cfg_path
self.stub_generator = stub_fn
self._dict = self.load()
def __setitem__(self, k, v):
self._dict[k] = v
def __getitem__(self, k):
return self._dict[k]
def __str__(self):
return str(self._dict)
def get(self, k, default=None):
return self._dict.get(k, default)
def stub(self):
cfg = self.stub_generator()
with open(self.path, 'w') as f:
yaml.dump(cfg, f)
return cfg
def load(self):
if not os.path.exists(self.path):
cfg = self.stub()
else:
with open(self.path, 'r') as f:
cfg = yaml.safe_load(f)
return cfg
def save(self):
with open(self.path, 'w') as f:
return yaml.safe_dump(self._dict, f)
The YAML files each have an un-documented schema because I am an asshole; because I am not an asshole, there is a function which will stub each of them when they are not existing, these are passed in to the constructor to YamlBackedDict
and listed as part of the Worker
class but if I do more work on this it should probably be sub-classes..
def stub_creds(ybd) -> Dict:
return dict(
username=input("Enter the Matrix username: "),
password=input("Enter the Matrix password: "),
homeserver=input("Enter the Matrix homserver URL: "),
)
def stub_config(ybd) -> Dict:
return dict(
rooms=[
input("Enter the first room ID to join (starts with ! not #): "),
],
feeds=[],
dynamic_urls=[],
)
def stub_cache(ybd) -> Dict:
# would be nice if this could include the set() logic in __init__ but...
return dict()
pyproject/poetry definition
I'll be a fool and use poetry for this even though I don't really need to.
[tool.poetry]
name = "matrix-feedbot"
version = "0.0.0"
description = "Send RSS feeds to a Matrix room"
license = "GPL-3.0-only"
authors = [
"Ryan Rix <code@whatthefuck.computer>"
]
packages = [
{ include = "matrix_feedbot" }
]
[tool.poetry.dependencies]
python = "^3.10"
matrix-nio = "^0.20"
feedparser = "^6.0.10"
pyyaml = "^6.0"
[tool.poetry.scripts]
feedbot = 'matrix_feedbot.feedbot:run'
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
NEXT In theory this could support e2e but lol, it's fine. Maybe later.
Deploy to The Wobserver
- State "DONE" from "INPROGRESS" [2023-04-17 Mon 10:58]
- State "INPROGRESS" from "NEXT" [2023-04-11 Tue 14:50]
Nix Shell setup
{ pkgs ? import <nixpkgs> {} }:
let
myPython = pkgs.python3.withPackages (ppkgs:
with ppkgs; [
feedparser
click
pyyaml
matrix-nio
]);
in
pkgs.mkShell {
packages = [
pkgs.poetry
myPython
];
}
Package feedbot in rixpkgs
{ lib,
fetchFromGitHub,
python3Packages,
callPackage,
}:
python3Packages.buildPythonPackage {
pname = "matrix-feedbot";
version = "0.0.1";
src = /home/rrix/Code/feedbot2;
format = "pyproject";
propagatedBuildInputs = with python3Packages; [ matrix-nio feedparser pyyaml ];
buildInputs = with python3Packages; [ poetry-core ];
# checkInputs = with python3Packages; [];
meta = {
homepage = "https://cce.whatthefuck.computer/matrix-feedbot";
description = "Post RSS/Atom feeds to Matrix";
license = lib.licenses.agpl3Only;
maintainers = with lib.maintainers; [ rrix ];
};
}
Deploy in Arroyo NixOS
This is set up to take options; in theory some day someone else could use this… but for now it's mostly for my own benefit.
{ lib, pkgs, config, ... }:
with lib; {
options.services.feedbot = {
enabled = mkOption {
type = types.bool;
default = true;
};
package = mkOption {
type = types.package;
default = pkgs.matrix-feedbot;
};
workDir = mkOption {
type = types.str;
default = "/srv/feedbot";
};
credentialsFileLocation = mkOption {
type = types.path;
default = "/srv/feedbot/creds.yaml";
};
cacheFileLocation = mkOption {
type = types.path;
default = "/srv/feedbot/cache.yaml";
};
rooms = mkOption {
type = types.listOf types.str;
default = [ "!THQSEcCQbqCZqLGUbG:kickass.systems" ];
};
dynamicFeedUrls = mkOption {
type = types.listOf types.str;
default = [ "https://thelionsrear.com/feeds.json" ];
};
feedUrls = mkOption {
type = types.listOf types.str;
default = [
<<mkFeeds()>>
];
};
};
config = {
users.groups.feedbot = {};
users.users.feedbot = {
createHome = true;
home = config.services.feedbot.workDir;
group = "feedbot";
isSystemUser = true;
};
systemd.services.feedbot = mkIf config.services.feedbot.enabled {
enable = true;
description = "Post RSS and Atom feeds to a Matrix room.";
after = [ "network.target" ];
script = "${config.services.feedbot.package}/bin/feedbot";
wantedBy = [ "default.target" ];
environment = {
FEEDBOT_CONFIG = pkgs.writeTextFile {
name = "feedbot.config.yaml";
text = generators.toYAML {} {
feeds = config.services.feedbot.feedUrls;
dynamic_urls = config.services.feedbot.dynamicFeedUrls;
rooms = config.services.feedbot.rooms;
};
};
FEEDBOT_CREDENTIALS_CONFIG = config.services.feedbot.credentialsFileLocation;
FEEDBOT_CACHE = config.services.feedbot.cacheFileLocation;
};
serviceConfig = {
RestartSec = 5;
Restart = "on-failure";
User = "feedbot";
};
};
};
}