206 lines
7.0 KiB
Python
206 lines
7.0 KiB
Python
# [[file:../../../org/cce/matrix-feedbot.org::*The Worker Class and Loop][The Worker Class and Loop:1]]
|
|
import asyncio
|
|
import aiohttp
|
|
import feedparser
|
|
import os
|
|
from typing import Dict
|
|
import datetime
|
|
import time # i hate python!!
|
|
|
|
from nio import AsyncClient, MatrixRoom, RoomMessageText
|
|
|
|
import yaml
|
|
|
|
class YamlBackedDict():
|
|
def __init__(self, cfg_path: str, stub_fn):
|
|
self.path = cfg_path
|
|
self.stub_generator = stub_fn
|
|
self._dict = self.load()
|
|
|
|
def __setitem__(self, k, v):
|
|
self._dict[k] = v
|
|
|
|
def __getitem__(self, k):
|
|
return self._dict[k]
|
|
|
|
def __str__(self):
|
|
return str(self._dict)
|
|
|
|
def get(self, k, default=None):
|
|
return self._dict.get(k, default)
|
|
|
|
def stub(self):
|
|
cfg = self.stub_generator()
|
|
|
|
with open(self.path, 'w') as f:
|
|
yaml.dump(cfg, f)
|
|
|
|
return cfg
|
|
|
|
def load(self):
|
|
if not os.path.exists(self.path):
|
|
cfg = self.stub()
|
|
else:
|
|
with open(self.path, 'r') as f:
|
|
cfg = yaml.safe_load(f)
|
|
|
|
return cfg
|
|
|
|
def save(self):
|
|
with open(self.path, 'w') as f:
|
|
return yaml.safe_dump(self._dict, f)
|
|
|
|
class Worker():
|
|
def __init__(self):
|
|
path = os.environ.get("FEEDBOT_CONFIG", "./cfg.yaml")
|
|
cred_path = os.environ.get("FEEDBOT_CREDENTIALS_CONFIG", "./creds.yaml")
|
|
cache_path = os.environ.get("FEEDBOT_CACHE", "./cache.yaml")
|
|
|
|
self.config = YamlBackedDict(path, self.stub_config)
|
|
self.credentials = YamlBackedDict(cred_path, self.stub_creds)
|
|
|
|
cache = { feed: set() for feed in self.config["feeds"] }
|
|
self.cache = YamlBackedDict(cfg_path=cache_path, stub_fn=self.stub_cache)
|
|
self.cache._dict = {**cache, **self.cache._dict}
|
|
|
|
self.client = AsyncClient(self.credentials["homeserver"],
|
|
self.credentials["username"])
|
|
self.last_fetch = datetime.datetime.fromtimestamp(0)
|
|
|
|
|
|
async def main(self) -> None:
|
|
await self.login()
|
|
self.client.add_event_callback(self.message_callback, RoomMessageText)
|
|
|
|
while True:
|
|
td = datetime.datetime.now() - self.last_fetch
|
|
if td > datetime.timedelta(hours=1):
|
|
print("refreshing...")
|
|
await self.maybe_fetch_feeds()
|
|
await self.maybe_fetch_dynamic_feeds()
|
|
self.cache.save()
|
|
self.last_fetch = datetime.datetime.now()
|
|
print("ahh...")
|
|
await self.client.sync(timeout=30*1000)
|
|
|
|
|
|
def stub_creds(ybd) -> Dict:
|
|
return dict(
|
|
username=input("Enter the Matrix username: "),
|
|
password=input("Enter the Matrix password: "),
|
|
homeserver=input("Enter the Matrix homserver URL: "),
|
|
)
|
|
|
|
|
|
def stub_config(ybd) -> Dict:
|
|
return dict(
|
|
rooms=[
|
|
input("Enter the first room ID to join (starts with ! not #): "),
|
|
],
|
|
feeds=[],
|
|
dynamic_urls=[],
|
|
)
|
|
|
|
|
|
def stub_cache(ybd) -> Dict:
|
|
# would be nice if this could include the set() logic in __init__ but...
|
|
return dict()
|
|
async def message_callback(self, room: MatrixRoom, event: RoomMessageText) -> None:
|
|
pass
|
|
|
|
async def send_message(self, feed, entry):
|
|
# construct message
|
|
f_link = feed['feed']['link']
|
|
f_title = feed['feed'].get('title', f_link)
|
|
|
|
e_link = entry['link']
|
|
e_title = entry.get('title', entry.get('link'))
|
|
|
|
html = f'''
|
|
New in <a href="{f_link}">{f_title}</a>:
|
|
<a href="{e_link}">{e_title}</a>
|
|
'''
|
|
text = f'New in {f_title}: {e_title} - {e_link}'
|
|
|
|
print(text)
|
|
print(html)
|
|
# TKTKTK configure which rooms a feed goes tooo.
|
|
# TKTKTK configure how often to scrape a feed
|
|
for room_id in self.config['rooms']:
|
|
print(await self.client.room_send(
|
|
room_id=room_id,
|
|
message_type="m.room.message",
|
|
|
|
content={"msgtype": "m.text",
|
|
"format": "org.matrix.custom.html",
|
|
"formatted_body": html,
|
|
"body": text },
|
|
))
|
|
async def maybe_fetch_feeds(self):
|
|
feedurls = self.config["feeds"]
|
|
|
|
for feed in feedurls:
|
|
await self.fetch_feed_url(feed)
|
|
|
|
|
|
async def maybe_fetch_dynamic_feeds(self):
|
|
for feed_config in self.config['dynamic_urls']:
|
|
async with aiohttp.ClientSession() as session:
|
|
data = await self.fetch(session, feed_config)
|
|
data = yaml.safe_load(data) # it's json and json is yaml so hahaha
|
|
for feed in data:
|
|
self.cache[feed['url']] = self.cache.get(feed['url'], set())
|
|
await self.fetch_feed_url(feed['url'])
|
|
|
|
async def fetch_feed_url(self, feed):
|
|
"""
|
|
greetz to https://stackoverflow.com/questions/23847555/asynchronous-feedparser-requests
|
|
"""
|
|
async with aiohttp.ClientSession() as session:
|
|
data = await self.fetch(session, feed)
|
|
rss = feedparser.parse(data)
|
|
|
|
print(f"got {feed} w/ {len(rss['entries'])}")
|
|
def filter_old_entries(entry):
|
|
if entry['updated_parsed'].tm_year > 1969:
|
|
entry_time = datetime.datetime.fromtimestamp(time.mktime(entry['updated_parsed']))
|
|
if datetime.datetime.now() - entry_time < datetime.timedelta(hours=24) and entry['link'] not in self.cache[feed]:
|
|
return True
|
|
else:
|
|
return False
|
|
else:
|
|
return False
|
|
|
|
entries_filtered = filter(filter_old_entries, rss['entries'])
|
|
entries_sorted = sorted(entries_filtered, key=lambda entry: entry['updated_parsed'])
|
|
for entry in entries_sorted:
|
|
await self.send_message(rss, entry)
|
|
self.cache[feed].add(str(entry['link']))
|
|
# if a post is made, we need to save the cache so that we don't spam our friends
|
|
self.cache.save()
|
|
|
|
|
|
async def fetch(self, session, url):
|
|
async with session.get(url) as response:
|
|
return await response.text()
|
|
|
|
|
|
async def login(self) -> None:
|
|
if self.credentials.get("access_token") is None:
|
|
login_resp = await self.client.login(self.credentials["password"])
|
|
self.credentials["access_token"] = login_resp.access_token
|
|
self.credentials["device_id"] = login_resp.device_id
|
|
self.credentials.save()
|
|
else:
|
|
self.client.access_token = self.credentials["access_token"]
|
|
self.client.device_id = self.credentials["device_id"]
|
|
|
|
for room_id in self.config["rooms"]:
|
|
await self.client.join(room_id)
|
|
|
|
|
|
def run():
|
|
w = Worker()
|
|
asyncio.get_event_loop().run_until_complete(w.main())
|
|
# The Worker Class and Loop:1 ends here
|