afdsew/afdsew.py

208 lines
6.9 KiB
Python
Executable File

#!/usr/local/bin/python
# [[id:20210921T173735.986594][AFDSEW is a single python file::1]]
import click
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)-15s %(message)s")
import pprint
import glob
import bisect
from bisect import bisect_right
import re
import string
import datetime
from io import StringIO
import arrow
import subprocess
import sys
import os
from os import path
import shutil
import markdown, feedgen.feed as feed
def format_afd_content(content):
content = re.sub("(?<!\n)\n(?!\n)", " ", content)
return "\n".join( string.capwords(line, ". ") for line in content.splitlines() ) + "\n"
def parse_afd_time(time_string):
return arrow.get(time_string, "hmm A ZZZ ddd MMM D YYYY").datetime
def pformat_time(timestamp):
return timestamp.strftime("%I:%M %p %A %B %d")
def parse_afd( afd ):
afd = "\n".join(afd.split("\n "))
# Find all headers in afd and all potential section endpoints
headers = { h.group(1) : h.span() for h in re.finditer("^\.([^.]*)\.\.\.", afd, re.MULTILINE)}
endpoints = sorted( set( [ endmark.start() for endmark in re.finditer("\n&&\n", afd, re.MULTILINE) ] + [s for s, e in headers.values()] ))
# Find closest endpoint for each header's content section and get content
header_result_spans = { h : (content_start, endpoints[ bisect_right( endpoints, content_start)]) for h, (header_start, content_start) in headers.items() }
afd_data = { h : afd[start:end].strip() for h, (start, end) in header_result_spans.items() }
rs = re.search("Area Forecast Discussion\nNational Weather Service Seattle WA\n(.*)\n", afd)
afd_data["TIME"] = rs.group(1)
return {
"timestamp" : parse_afd_time( afd_data["TIME"] ),
"content" : afd_data }
def format_afd(afd):
afd_sections = afd["content"]
meta_sections = ["TIME", "SYNOPSIS"]
main_section_names = ["SHORT TERM", "LONG TERM"]
main_sections = []
for n in main_section_names:
main_sections.extend( glob.fnmatch.filter( afd_sections.keys(), n + "*") )
formatted_AFD = StringIO()
formatted_AFD.write( pformat_time(afd["timestamp"]) + "\n")
formatted_AFD.write("=" * len( afd_sections["TIME"]) + "\n" )
synopsis_raw = afd_sections.get("SYNOPSIS") or afd_sections.get("UPDATE") or ""
formatted_AFD.write( format_afd_content(synopsis_raw) + "\n")
for h in main_sections:
formatted_AFD.write( h + "\n" )
formatted_AFD.write( "-" * len(h) + "\n" )
formatted_AFD.write( format_afd_content(afd_sections[h]))
formatted_AFD.write("\n")
for h in set( afd_sections.keys() ).difference( set( main_sections + meta_sections )):
formatted_AFD.write( h + "\n" )
formatted_AFD.write( "-" * len(h) + "\n" )
formatted_AFD.write( format_afd_content(afd_sections[h]))
formatted_AFD.write("\n")
return formatted_AFD.getvalue()
import hashlib
def setup_afd_feed(result_dir, afd_entries):
# TODO: Generate index.html from md stored in raw_SEW instead
#
# if path.exists(result_dir):
# logging.info("Removing existing root: %s", result_dir)
# shutil.rmtree(result_dir)
# os.makedirs(result_dir)
afd_feed = feed.FeedGenerator()
afd_feed.title("NWS Seattle Area Forecast Discussion")
afd_feed.link(href="https://afd.fontkeming.fail/SEW/current.md", rel="self")
afd_feed.id('https://afd.fontkeming.fail')
afd_feed.author(name="Ryan Rix", email="ry@n.rix.si")
afd_feed.description("NWS Seattle Area Forecast Discussion")
current = None
current_html = ""
for afd_entry in sorted(afd_entries, reverse = True, key=lambda e: e["timestamp"] ):
eid = afd_entry["timestamp"].strftime("%y-%m-%d-%H%m")
if not current:
afd_feed.updated(afd_entry["timestamp"])
current = eid
entry_md = format_afd(afd_entry)
logging.debug("Rendered entry md:\n%s", entry_md)
entry_md_file = path.join(result_dir, eid + ".md")
logging.info("Writing entry file: %s", entry_md_file)
with open(entry_md_file, "w") as md_out:
md_out.write(entry_md)
entry_html = markdown.markdown( entry_md )
entry_html_file = path.join(result_dir, eid + ".html")
logging.info("Writing entry html file: %s", entry_html_file)
with open(entry_html_file, "w") as html_out:
html_out.write(entry_html)
item = afd_feed.add_entry()
if not current_html:
current_html = entry_html
item.title(pformat_time(afd_entry["timestamp"]))
item.link(href=("https://afd.fontkeming.fail/" + eid + ".html"))
item.description(entry_html)
item.summary(entry_html) # XXX: would be nice to only have the summary here...
item.pubDate(afd_entry["timestamp"])
item.updated(afd_entry["timestamp"])
item.guid(eid)
item.id(eid)
logging.info("Writing current: %s", current)
with open( path.join(result_dir, "latest.html"), "w") as latest_out:
latest_out.write(current_html)
logging.info("Rendering feed file: %s", path.join(result_dir, "AFDSEW.xml"))
afd_feed.atom_file( path.join(result_dir, "AFDSEW.xml"))
return result_dir
def fetch_feed_files(search_prefix):
feed_files = glob.glob(path.join(search_prefix+"/raw_SEW/AFDSEW.*"))
return feed_files
def is_new_afd(text, search_prefix):
feed_files = fetch_feed_files(search_prefix)
new_hash = hashlib.sha224(text).hexdigest()
for fname in feed_files:
with open(fname, "rb") as f:
fc = f.read()
hash = hashlib.sha224(fc).hexdigest()
if hash == new_hash:
return False
return True
script_root_dir = path.dirname( path.abspath(__file__) )
@click.group(invoke_without_command=True, chain=True)
@click.pass_context
@click.option("--prefix", default=script_root_dir)
def cli(ctx, prefix):
if ctx.invoked_subcommand is None:
fetch(prefix)
generate(prefix)
@cli.command()
@click.option("--prefix", default=script_root_dir)
def generate(prefix):
logging.info("Generating")
feed_files = fetch_feed_files(prefix)
afd_entries = [parse_afd(open(s).read()) for s in feed_files]
logging.info("Parsed %s entries.", len(afd_entries))
setup_afd_feed(path.join(prefix, "SEW"), afd_entries)
@cli.command()
@click.option("--prefix", default=script_root_dir)
def fetch(prefix):
logging.info("Fetching")
url = "https://www.wrh.noaa.gov/total_forecast/getprod.php?" + \
"new&wfo=sew&sid=SEW&pil=AFD&toggle=textonly"
rsp = subprocess.check_output("curl '{url}'".format(url=url), shell=True)
rsp_str = rsp.decode()
afd = parse_afd(rsp_str)
ts = afd.get("timestamp")
suffix = ts.strftime("%s")
if is_new_afd(rsp, prefix):
with open(prefix + "/raw_SEW/AFDSEW." + suffix, "w") as f:
f.write(rsp_str)
logging.info("Done")
if __name__ == "__main__":
cli()
# AFDSEW is a single python file::1 ends here