complete-computing-environment/matrix-feedbot.org

17 KiB

RSS Feed Bot Posting to Matrix.org

This is the sibling of Feediverse, it's a small Matrix.org client which uses Python's feedparser library to post RSS and Atom feeds to a Matrix room. I used to run this myself and a few years ago I moved to Matrix's hosted "Neb" Bot solution since it could be configured by others. Well, they went and goofed it up and rather than make every feed I care about adhere 100% to a brittle parser, we'll go back to a known-working solution.

This is a Literate Programming version of my old matrix-feedbot with some features added to it like being able to load some feeds from the feeds.json endpoint in the Arcology Routing Logic/Arroyo Feed Cache Generator.

The Script Itself

The Worker Class and Loop

Look, this thing is pretty un-exciting. It's a while loop with a bunch of scaffolding to persist configuration, fetch feeds, and send messages. Each of those functionalities is broken out below with the core logical loop remaining in place here.

import asyncio
import aiohttp
import feedparser
import os
from typing import Dict
import datetime
import time # i hate python!!

from nio import AsyncClient, MatrixRoom, RoomMessageText

<<YamlBackedDict>>

class Worker():
    def __init__(self):
        path = os.environ.get("FEEDBOT_CONFIG", "./cfg.yaml")
        cred_path = os.environ.get("FEEDBOT_CREDENTIALS_CONFIG", "./creds.yaml")
        cache_path = os.environ.get("FEEDBOT_CACHE", "./cache.yaml")

        self.config      = YamlBackedDict(path,      self.stub_config)
        self.credentials = YamlBackedDict(cred_path, self.stub_creds)

        cache = { feed: set() for feed in self.config["feeds"] }
        self.cache = YamlBackedDict(cfg_path=cache_path, stub_fn=self.stub_cache)
        self.cache._dict = {**cache, **self.cache._dict}

        self.client = AsyncClient(self.credentials["homeserver"],
                                  self.credentials["username"])
        self.last_fetch = datetime.datetime.fromtimestamp(0)


    async def main(self) -> None:
        await self.login()
        self.client.add_event_callback(self.message_callback, RoomMessageText)

        while True:
            td = datetime.datetime.now() - self.last_fetch
            if td > datetime.timedelta(hours=1):
                print("refreshing...")
                await self.maybe_fetch_feeds()
                await self.maybe_fetch_dynamic_feeds()
                self.cache.save()
                self.last_fetch = datetime.datetime.now()
                print("ahh...")
            await self.client.sync(timeout=30*1000)


    <<stub-functions>>
    <<message>>
    <<feed-fetch>>


    async def login(self) -> None:
        if self.credentials.get("access_token") is None:
            login_resp = await self.client.login(self.credentials["password"])
            self.credentials["access_token"] = login_resp.access_token
            self.credentials["device_id"] = login_resp.device_id
            self.credentials.save()
        else:
            self.client.access_token = self.credentials["access_token"]
            self.client.device_id = self.credentials["device_id"]

        for room_id in self.config["rooms"]:
            await self.client.join(room_id)


def run():
     w = Worker()
     asyncio.get_event_loop().run_until_complete(w.main())

Fetch and Parse Feeds Asynchronously

These functions, starting with fetch_feed_url create an async function which will handle feed parsing and whatnot all the way through sending the message. maybe_fetch_dynamic_feeds will reach out to the Arcology's feeds.json endpoint and iterate over each of those, and maybe_fetch_feeds will iterate over the statically defined ones.

async def maybe_fetch_feeds(self):
    feedurls = self.config["feeds"]

    for feed in feedurls:
        await self.fetch_feed_url(feed)


async def maybe_fetch_dynamic_feeds(self):
    for feed_config in self.config['dynamic_urls']:
        async with aiohttp.ClientSession() as session:
            data = await self.fetch(session, feed_config)
            data = yaml.safe_load(data) # it's json and json is yaml so hahaha
            for feed in data:
                self.cache[feed['url']] = self.cache.get(feed['url'], set())
                await self.fetch_feed_url(feed['url'])

async def fetch_feed_url(self, feed):
    """
    greetz to https://stackoverflow.com/questions/23847555/asynchronous-feedparser-requests
    """
    async with aiohttp.ClientSession() as session:
        data = await self.fetch(session, feed)
        rss = feedparser.parse(data)

        print(f"got {feed} w/ {len(rss['entries'])}")
        def filter_old_entries(entry):
            if entry['updated_parsed'].tm_year > 1969:
                entry_time = datetime.datetime.fromtimestamp(time.mktime(entry['updated_parsed']))
                if datetime.datetime.now() - entry_time < datetime.timedelta(hours=24) and entry['link'] not in self.cache[feed]:
                    return True
                else:
                    return False
            else:
                return False

        entries_filtered = filter(filter_old_entries, rss['entries'])
        entries_sorted = sorted(entries_filtered, key=lambda entry: entry['updated_parsed'])
        for entry in entries_sorted:
            await self.send_message(rss, entry)
            self.cache[feed].add(str(entry['link']))
            #  if a post is made, we need to save the cache so that we don't spam our friends
            self.cache.save()


async def fetch(self, session, url):
    async with session.get(url) as response:
        return await response.text()

Message Formatting and Sending

async def message_callback(self, room: MatrixRoom, event: RoomMessageText) -> None:
    pass

async def send_message(self, feed, entry):
    # construct message
    html = ''.join([
        'New in <a href="', feed['feed']['link'], '">',
        feed['feed']['title'],
        '</a>: <a href="', entry['link'], '">', entry.get('title', entry.get('link')), '</a>'
    ])
    text = ''.join([
        'New in', feed['feed']['title'],
        ': ', entry['link'], " - ", entry.get('title', entry.get('link'))
    ])

    print(text)
    print(html)
    # TKTKTK configure which rooms a feed goes tooo.
    # TKTKTK configure how often to scrape a feed
    for room_id in self.config['rooms']:
        print(await self.client.room_send(
            room_id=room_id,
            message_type="m.room.message",

            content={"msgtype": "m.text",
                     "format": "org.matrix.custom.html",
                     "formatted_body": html,
                     "body": text },
        ))

YamlBackedDict

I have this stupid config structure i made which is just used to load and persist a YAML file:

import yaml

class YamlBackedDict():
    def __init__(self, cfg_path: str, stub_fn):
        self.path = cfg_path
        self.stub_generator = stub_fn
        self._dict = self.load()

    def __setitem__(self, k, v):
        self._dict[k] = v

    def __getitem__(self, k):
        return self._dict[k]

    def __str__(self):
        return str(self._dict)

    def get(self, k, default=None):
        return self._dict.get(k, default)

    def stub(self):
        cfg = self.stub_generator()

        with open(self.path, 'w') as f:
            yaml.dump(cfg, f)

        return cfg

    def load(self):
        if not os.path.exists(self.path):
            cfg = self.stub()
        else:
            with open(self.path, 'r') as f:
                cfg = yaml.safe_load(f)

        return cfg

    def save(self):
        with open(self.path, 'w') as f:
            return yaml.safe_dump(self._dict, f)

The YAML files each have an un-documented schema because I am an asshole; because I am not an asshole, there is a function which will stub each of them when they are not existing, these are passed in to the constructor to YamlBackedDict and listed as part of the Worker class but if I do more work on this it should probably be sub-classes..

def stub_creds(ybd) -> Dict:
    return dict(
        username=input("Enter the Matrix username: "),
        password=input("Enter the Matrix password: "),
        homeserver=input("Enter the Matrix homserver URL: "),
    )


def stub_config(ybd) -> Dict:
    return dict(
        rooms=[
            input("Enter the first room ID to join (starts with ! not #): "),
        ],
        feeds=[],
        dynamic_urls=[],
    )


def stub_cache(ybd) -> Dict:
    # would be nice if this could include the set() logic in __init__ but...
    return dict()

pyproject/poetry definition

I'll be a fool and use poetry for this even though I don't really need to.

[tool.poetry]
name = "matrix-feedbot"
version = "0.0.0"
description = "Send RSS feeds to a Matrix room"
license = "GPL-3.0-only"
authors = [
  "Ryan Rix <code@whatthefuck.computer>"
]

packages = [
  { include = "matrix_feedbot" }
]

[tool.poetry.dependencies]
python = "^3.10"
matrix-nio = "^0.20"
feedparser = "^6.0.10"
pyyaml = "^6.0"

[tool.poetry.scripts]
feedbot = 'matrix_feedbot.feedbot:run'

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

NEXT In theory this could support e2e but lol, it's fine. Maybe later.

Deploy to The Wobserver

  • State "DONE" from "INPROGRESS" [2023-04-17 Mon 10:58]
  • State "INPROGRESS" from "NEXT" [2023-04-11 Tue 14:50]

Nix Shell setup

{ pkgs ? import <nixpkgs> {} }:

let
  myPython = pkgs.python3.withPackages (ppkgs:
    with ppkgs; [
      feedparser
      click
      pyyaml
      matrix-nio
    ]);
in
pkgs.mkShell {
  packages = [
    pkgs.poetry
    myPython
  ];
}

Package feedbot in rixpkgs

{ lib,
  fetchFromGitHub,
  python3Packages,
  callPackage,
}:

python3Packages.buildPythonPackage {
  pname = "matrix-feedbot";

  version = "0.0.1";
  src = /home/rrix/Code/feedbot2;

  format = "pyproject";

  propagatedBuildInputs = with python3Packages; [ matrix-nio feedparser pyyaml ];
  buildInputs = with python3Packages; [ poetry-core ];

  # checkInputs = with python3Packages; [];

  meta = {
    homepage = "https://cce.whatthefuck.computer/matrix-feedbot";
    description = "Post RSS/Atom feeds to Matrix";
    license = lib.licenses.agpl3Only;
    maintainers = with lib.maintainers; [ rrix ];
  };
}

Deploy in Arroyo NixOS

This is set up to take options; in theory some day someone else could use this… but for now it's mostly for my own benefit.

{ lib, pkgs, config, ... }:

with lib; {
  options.services.feedbot = {
    enabled = mkOption {
      type = types.bool;
      default = true;
    };

    package = mkOption {
      type = types.package;
      default = pkgs.matrix-feedbot;
    };

    workDir = mkOption {
      type = types.str;
      default = "/srv/feedbot";
    };

    credentialsFileLocation = mkOption {
      type = types.path;
      default = "/srv/feedbot/creds.yaml";
    };

    cacheFileLocation = mkOption {
      type = types.path;
      default = "/srv/feedbot/cache.yaml";
    };

    rooms = mkOption {
      type = types.listOf types.str;
      default = [ "!THQSEcCQbqCZqLGUbG:kickass.systems" ];
    };

    dynamicFeedUrls = mkOption {
      type = types.listOf types.str;
      default = [ "https://thelionsrear.com/feeds.json" ];
    };

    feedUrls = mkOption {
      type = types.listOf types.str;
      default = [
        <<mkFeeds()>>
      ];
    };
  };

  config = {
    users.groups.feedbot = {};
    users.users.feedbot = {
      createHome = true;
      home = config.services.feedbot.workDir;
      group = "feedbot";
      isSystemUser = true;
    };
    systemd.services.feedbot = mkIf config.services.feedbot.enabled {
      enable = true;
      description = "Post RSS and Atom feeds to a Matrix room.";
      after = [ "network.target" ];
      script = "${config.services.feedbot.package}/bin/feedbot";
      wantedBy = [ "default.target" ];

      environment = {
        FEEDBOT_CONFIG = pkgs.writeTextFile {
          name = "feedbot.config.yaml";
          text = generators.toYAML {} {
            feeds = config.services.feedbot.feedUrls;
            dynamic_urls = config.services.feedbot.dynamicFeedUrls;
            rooms = config.services.feedbot.rooms;
          };
        };
        FEEDBOT_CREDENTIALS_CONFIG = config.services.feedbot.credentialsFileLocation;
        FEEDBOT_CACHE = config.services.feedbot.cacheFileLocation;
      };

      serviceConfig = {
        RestartSec = 5;
        Restart = "on-failure";
        User = "feedbot";
      };
    };
  };
}