feedbot2/matrix_feedbot/feedbot.py

206 lines
7.0 KiB
Python

# [[file:../../../org/cce/matrix-feedbot.org::*The Worker Class and Loop][The Worker Class and Loop:1]]
import asyncio
import aiohttp
import feedparser
import os
from typing import Dict
import datetime
import time # i hate python!!
from nio import AsyncClient, MatrixRoom, RoomMessageText
import yaml
class YamlBackedDict():
def __init__(self, cfg_path: str, stub_fn):
self.path = cfg_path
self.stub_generator = stub_fn
self._dict = self.load()
def __setitem__(self, k, v):
self._dict[k] = v
def __getitem__(self, k):
return self._dict[k]
def __str__(self):
return str(self._dict)
def get(self, k, default=None):
return self._dict.get(k, default)
def stub(self):
cfg = self.stub_generator()
with open(self.path, 'w') as f:
yaml.dump(cfg, f)
return cfg
def load(self):
if not os.path.exists(self.path):
cfg = self.stub()
else:
with open(self.path, 'r') as f:
cfg = yaml.safe_load(f)
return cfg
def save(self):
with open(self.path, 'w') as f:
return yaml.safe_dump(self._dict, f)
class Worker():
def __init__(self):
path = os.environ.get("FEEDBOT_CONFIG", "./cfg.yaml")
cred_path = os.environ.get("FEEDBOT_CREDENTIALS_CONFIG", "./creds.yaml")
cache_path = os.environ.get("FEEDBOT_CACHE", "./cache.yaml")
self.config = YamlBackedDict(path, self.stub_config)
self.credentials = YamlBackedDict(cred_path, self.stub_creds)
cache = { feed: set() for feed in self.config["feeds"] }
self.cache = YamlBackedDict(cfg_path=cache_path, stub_fn=self.stub_cache)
self.cache._dict = {**cache, **self.cache._dict}
self.client = AsyncClient(self.credentials["homeserver"],
self.credentials["username"])
self.last_fetch = datetime.datetime.fromtimestamp(0)
async def main(self) -> None:
await self.login()
self.client.add_event_callback(self.message_callback, RoomMessageText)
while True:
td = datetime.datetime.now() - self.last_fetch
if td > datetime.timedelta(hours=1):
print("refreshing...")
await self.maybe_fetch_feeds()
await self.maybe_fetch_dynamic_feeds()
self.cache.save()
self.last_fetch = datetime.datetime.now()
print("ahh...")
await self.client.sync(timeout=30*1000)
def stub_creds(ybd) -> Dict:
return dict(
username=input("Enter the Matrix username: "),
password=input("Enter the Matrix password: "),
homeserver=input("Enter the Matrix homserver URL: "),
)
def stub_config(ybd) -> Dict:
return dict(
rooms=[
input("Enter the first room ID to join (starts with ! not #): "),
],
feeds=[],
dynamic_urls=[],
)
def stub_cache(ybd) -> Dict:
# would be nice if this could include the set() logic in __init__ but...
return dict()
async def message_callback(self, room: MatrixRoom, event: RoomMessageText) -> None:
pass
async def send_message(self, feed, entry):
# construct message
f_link = feed['feed']['link']
f_title = feed['feed'].get('title', f_link)
e_link = entry['link']
e_title = entry.get('title', entry.get('link'))
html = f'''
New in <a href="{f_link}">{f_title}</a>:
<a href="{e_link}">{e_title}</a>
'''
text = f'New in {f_title}: {e_title} - {e_link}'
print(text)
print(html)
# TKTKTK configure which rooms a feed goes tooo.
# TKTKTK configure how often to scrape a feed
for room_id in self.config['rooms']:
print(await self.client.room_send(
room_id=room_id,
message_type="m.room.message",
content={"msgtype": "m.text",
"format": "org.matrix.custom.html",
"formatted_body": html,
"body": text },
))
async def maybe_fetch_feeds(self):
feedurls = self.config["feeds"]
for feed in feedurls:
await self.fetch_feed_url(feed)
async def maybe_fetch_dynamic_feeds(self):
for feed_config in self.config['dynamic_urls']:
async with aiohttp.ClientSession() as session:
data = await self.fetch(session, feed_config)
data = yaml.safe_load(data) # it's json and json is yaml so hahaha
for feed in data:
self.cache[feed['url']] = self.cache.get(feed['url'], set())
await self.fetch_feed_url(feed['url'])
async def fetch_feed_url(self, feed):
"""
greetz to https://stackoverflow.com/questions/23847555/asynchronous-feedparser-requests
"""
async with aiohttp.ClientSession() as session:
data = await self.fetch(session, feed)
rss = feedparser.parse(data)
print(f"got {feed} w/ {len(rss['entries'])}")
def filter_old_entries(entry):
if entry['updated_parsed'].tm_year > 1969:
entry_time = datetime.datetime.fromtimestamp(time.mktime(entry['updated_parsed']))
if datetime.datetime.now() - entry_time < datetime.timedelta(hours=24) and entry['link'] not in self.cache[feed]:
return True
else:
return False
else:
return False
entries_filtered = filter(filter_old_entries, rss['entries'])
entries_sorted = sorted(entries_filtered, key=lambda entry: entry['updated_parsed'])
for entry in entries_sorted:
await self.send_message(rss, entry)
self.cache[feed].add(str(entry['link']))
# if a post is made, we need to save the cache so that we don't spam our friends
self.cache.save()
async def fetch(self, session, url):
async with session.get(url) as response:
return await response.text()
async def login(self) -> None:
if self.credentials.get("access_token") is None:
login_resp = await self.client.login(self.credentials["password"])
self.credentials["access_token"] = login_resp.access_token
self.credentials["device_id"] = login_resp.device_id
self.credentials.save()
else:
self.client.access_token = self.credentials["access_token"]
self.client.device_id = self.credentials["device_id"]
for room_id in self.config["rooms"]:
await self.client.join(room_id)
def run():
w = Worker()
asyncio.get_event_loop().run_until_complete(w.main())
# The Worker Class and Loop:1 ends here