complete-computing-environment/matrix-feedbot.org

8.1 KiB

RSS Feed Bot Posting to Matrix.org

This is the sibling of Feediverse, it's a small Matrix.org client which uses Python's feedparser library to post RSS and Atom feeds to a Matrix room. I used to run this myself and a few years ago I moved to Matrix's hosted "Neb" Bot solution since it could be configured by others. Well, they went and goofed it up and rather than make every feed I care about adhere 100% to a brittle parser, we'll go back to a known-working solution.

This is a Literate Programming version of my old matrix-feedbot with some features added to it like being able to load some feeds from the feeds.json endpoint in the Arcology Routing Logic/Arroyo Feed Cache Generator.

Nix Shell setup

{ pkgs ? import <nixpkgs> {} }:

let
  myPython = pkgs.python3.withPackages (ppkgs:
    with ppkgs; [
      feedparser
      click
      pyyaml
      matrix-nio
    ]);
in
pkgs.mkShell {
  packages = [
    pkgs.poetry
    myPython
  ];
}

NEXT I'll convert myPython to a poetry2nix later on.

pyproject/poetry definition

I'll be a fool and use poetry for this even though I don't really need to.

[tool.poetry]
name = "matrix-feedbot"
version = "0.0.0"
description = "Send RSS feeds to a Matrix room"
license = "GPL-3.0-only"
authors = [
  "Ryan Rix <code@whatthefuck.computer"
]

packages = [
  { include = "matrix_feedbot" }
]

[tool.poetry.dependencies]
python = "^3.10"
matrix-nio = "^0.20.2"
feedparser = "^6.0.10"
pyyaml = "^6.0"

NEXT In theory this could support e2e but lol, it's fine. Maybe later.

The Script Itself

import asyncio
import aiohttp
import yaml
import feedparser
import os
from typing import Dict
import datetime
import time # i hate python!!

from nio import AsyncClient, MatrixRoom, RoomMessageText


def stub_config(path: str) -> Dict:
    cfg = dict(
        credentials=dict(
            username=input("Enter the Matrix username: "),
            password=input("Enter the Matrix password: "),
            homeserver=input("Enter the Matrix homserver URL: "),
        ),
        rooms=[
            input("Enter the first room ID to join (starts with ! not #): "),
        ],
        feeds=[],
        dynamic_urls=[],
    )
    with open(path, 'w') as f:
        yaml.dump(cfg, f)

    return cfg

def load_config() -> Dict:
    path = os.environ.get("FEEDBOT_CONFIG", "./cfg.yaml")
    if not os.path.exists(path):
        cfg = stub_config(path)
    else:
        with open(path, 'r') as f:
            cfg = yaml.safe_load(f)

    return cfg

class Worker():
    def __init__(self):
        self.config = load_config()
        self.client = AsyncClient(self.config["credentials"]["homeserver"],
                                  self.config["credentials"]["username"])
        self.cache = self.load_cache()
        self.last_fetch = datetime.datetime.fromtimestamp(0)

    # make async
    def persist_cache(self):
        # hoist
        path = os.environ.get("FEEDBOT_CACHE", "./cache.yaml")
        with open(path, 'w') as f:
            return yaml.safe_dump(self.cache, f)

    def load_cache(self):
        path = os.environ.get("FEEDBOT_CACHE", "./cache.yaml")
        cache = { feed: set() for feed in self.config["feeds"] }

        if os.path.exists(path):
            with open(path, 'r') as f:
                cache = {**cache, **yaml.safe_load(f)}

        return cache
        

    async def message_callback(self, room: MatrixRoom, event: RoomMessageText) -> None:
        pass

    async def send_message(self, feed, entry):
        # construct message
        html = ''.join([
            'New in <a href="', feed['feed']['link'], '">',
            feed['feed']['title'],
            '</a>: <a href="', entry['link'], '">', entry['title'], '</a>'
        ])
        text = ''.join([
            'New in', feed['feed']['title'],
            ': ', entry['link'], " - ", entry['title']
        ])

        print(text)
        print(html)
        # TKTKTK configure which rooms a feed goes tooo.
        # TKTKTK configure how often to scrape a feed
        for room_id in self.config['rooms']:
            print(await self.client.room_send(
                room_id=room_id,
                message_type="m.room.message",

                content={"msgtype": "m.text",
                         "format": "org.matrix.custom.html",
                         "formatted_body": html,
                         "body": text },
            ))

    async def fetch(self, session, url):
        async with session.get(url) as response:
            return await response.text()

    async def maybe_fetch_feeds(self):
        """
        greetz to https://stackoverflow.com/questions/23847555/asynchronous-feedparser-requests
        """
        feedurls = self.config["feeds"]

        for feed in feedurls:
            await self.fetch_feed_url(feed)


    async def maybe_fetch_dynamic_feeds(self):
        for feed_config in self.config['dynamic_urls']:
            async with aiohttp.ClientSession() as session:
                data = await self.fetch(session, feed_config)
                data = yaml.safe_load(data) # it's json and json is yaml so hahaha
                for feed in data:
                    self.cache[feed['url']] = self.cache.get(feed['url'], set())
                    await self.fetch_feed_url(feed['url'])


    async def fetch_feed_url(self, feed):
        async with aiohttp.ClientSession() as session:
            data = await self.fetch(session, feed)
            rss = feedparser.parse(data)
            
            print(f"got {feed} w/ {len(rss['entries'])}")
            def filter_old_entries(entry):
                if entry['updated_parsed'].tm_year > 1969:
                    entry_time = datetime.datetime.fromtimestamp(time.mktime(entry['updated_parsed']))
                    if datetime.datetime.now() - entry_time < datetime.timedelta(hours=24) and entry['link'] not in self.cache[feed]:
                        return True
                    else:
                        return False
                else:
                    return False

            entries_filtered = filter(filter_old_entries, rss['entries'])
            entries_sorted = sorted(entries_filtered, key=lambda entry: entry['updated_parsed'])
            for entry in entries_sorted:
                await self.send_message(rss, entry)
                self.cache[feed].add(str(entry['link']))


    async def main(self) -> None:
        self.client.add_event_callback(self.message_callback, RoomMessageText)

        print(await self.client.login(self.config["credentials"]["password"]))
        for room_id in self.config["rooms"]:
            await self.client.join(room_id)
        while True:
            td = datetime.datetime.now() - self.last_fetch 
            if td > datetime.timedelta(hours=1):
                print("refreshing...")
                await self.maybe_fetch_feeds()
                await self.maybe_fetch_dynamic_feeds()
                self.persist_cache()
                self.last_fetch = datetime.datetime.now()
                print("ahh...")
            await self.client.sync(timeout=30*1000)

w = Worker()
asyncio.get_event_loop().run_until_complete(w.main())

NEXT detangle

NEXT feed -> roomlist mappings? so that my dev feeds can also go to my devlog rooms

NEXT commands in message_callback

INPROGRESS tag feeds based on fetch recency rather than using a sleep so that the commands don't block

  • State "INPROGRESS" from "NEXT" [2023-04-01 Sat 00:23]

NEXT store access_token in config

NEXT deploy to The Wobserver

NEXT register bot user