133 lines
3.4 KiB
Python
133 lines
3.4 KiB
Python
from dataclasses import dataclass
|
|
from typing import Optional
|
|
|
|
from fastapi import Request
|
|
from starlette import routing
|
|
import sqlmodel
|
|
|
|
from arcology.parse import parse_sexp, print_sexp
|
|
from arcology.sites import sites, Site
|
|
from arcology.config import get_settings, Environment
|
|
from arcology.sites import host_to_site
|
|
|
|
route_regexp, _, _ = routing.compile_path("/{sub_key:path}/")
|
|
route_regexp2, _, _ = routing.compile_path("/{sub_key:path}")
|
|
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
@dataclass
|
|
class ArcologyKey():
|
|
key: str
|
|
site_key: str
|
|
site: Site
|
|
rest: str = ""
|
|
anchor_id: Optional[str] = None
|
|
|
|
def __init__(self, key: str, site_key="", rest="", anchor_id = None):
|
|
self.key = key
|
|
self.site_key=site_key
|
|
self.rest = rest
|
|
self.anchor_id = anchor_id
|
|
|
|
stop = '/'
|
|
idx = 0
|
|
collector = [""]
|
|
for char in key:
|
|
if char == stop:
|
|
stop = '#'
|
|
idx += 1
|
|
collector = collector + [""]
|
|
continue
|
|
collector[idx] += char
|
|
|
|
if len(collector) > 0:
|
|
self.site_key = collector[0]
|
|
self.site = sites.get(self.site_key, None)
|
|
if len(collector) > 1:
|
|
self.rest = collector[1]
|
|
if len(collector) > 2:
|
|
self.anchor_id = collector[2]
|
|
|
|
def to_url(self) -> str:
|
|
env = get_settings().arcology_env
|
|
domains = self.site.domains.get(env, None)
|
|
|
|
url = ""
|
|
if domains is not None:
|
|
url = "https://{domain}/{rest}".format(domain=domains[0], rest=self.rest)
|
|
else:
|
|
url = "http://localhost:8000/{key}".format(key=self.key)
|
|
if self.anchor_id is not None:
|
|
url = url + "#" + self.anchor_id
|
|
|
|
return url
|
|
|
|
def from_request(request: Request):
|
|
path = request.url.path
|
|
host = request.headers.get('host')
|
|
return ArcologyKey.from_host_and_path(host, path)
|
|
|
|
def from_host_and_path(host: str, path: str):
|
|
m = route_regexp.match(path) or route_regexp2.match(path) or None
|
|
if m is None:
|
|
logger.debug("no path match: %s", path)
|
|
return None
|
|
sub_key = m.group("sub_key")
|
|
|
|
site = host_to_site(host)
|
|
if site is None:
|
|
logger.debug("no host match: %s", host)
|
|
return None
|
|
|
|
if len(sub_key) == 0:
|
|
sub_key = "index"
|
|
key = "{site_key}/{sub_key}".format(
|
|
site_key=site.key,
|
|
sub_key=sub_key,
|
|
)
|
|
return ArcologyKey(key)
|
|
|
|
def id_to_arcology_key(id: str, session: sqlmodel.Session) -> Optional[ArcologyKey]:
|
|
"""
|
|
Given a node ID, return the ARCOLOGY_KEY for the node.
|
|
"""
|
|
from .arroyo import Node
|
|
|
|
linked_node_query = sqlmodel.select(Node) \
|
|
.where(Node.node_id==print_sexp(id))
|
|
res = session.exec(linked_node_query)
|
|
|
|
linked_node = res.all()
|
|
if len(linked_node) == 1:
|
|
linked_node = linked_node[0]
|
|
linked_page = linked_node.page
|
|
|
|
if linked_page == None:
|
|
return None
|
|
|
|
page_key = parse_sexp(linked_page.key)
|
|
ret = ArcologyKey(key=page_key)
|
|
if linked_node.level != 0:
|
|
ret.anchor_id = id
|
|
return ret
|
|
|
|
elif len(linked_node) != 0:
|
|
raise Exception(f"more than one key for node? {id}")
|
|
else:
|
|
return None
|
|
|
|
def file_to_arcology_key(file: str, session: sqlmodel.Session) -> Optional[ArcologyKey]:
|
|
"""
|
|
Given a node ID, return the ARCOLOGY_KEY for the node.
|
|
"""
|
|
from .arroyo import Page
|
|
key_q = sqlmodel.select(Page).where(Page.file == print_sexp(file))
|
|
page = session.exec(key_q).first()
|
|
|
|
if page is None:
|
|
return
|
|
page_key = parse_sexp(page.key)
|
|
return ArcologyKey(key=page_key)
|