20 KiB
Interfacing with the Arcology
- Ingest files in bulk using the command line
- Generating code using the Arroyo Generators
- Ingest files on-demand using Syncthing
- Create Base Sites
,#+AUTO_TANGLE: vars:org-babel-default-header-args
public API and grymoire of common commands and literate-programming patterns
Ingest files in bulk using the command line
This work is a bit complicated, but working through it will help. This is the command which seeds a database for the Arcology by parsing each document and persisting relevant metadata.
import os.path
import glob
from itertools import filterfalse
import pdb
import graphlib
import pathlib
from generators.models import DependencyRelationship
from django.core.management.base import BaseCommand, CommandError
from django.utils.module_loading import import_string
import arroyo.arroyo_rs as native
import logging
logger = logging.getLogger(__name__)
# from roam.models import File, Keyword, Heading, Tag, Reference
from roam.models import File
from roam.core import persist_one_file, arroyo_persist_one_file, parse_doc
from roam.core import should_file_persist
class Command(BaseCommand):
help = 'Ingest files in to the Roam database'
output_transaction = True
requires_migrations_checks = True
It acts on a single argument, the directory containing the org-roam files.
def add_arguments(self, parser):
parser.add_argument('source_dir', type=str, default="~/org", nargs='?')
def handle(self, *args, **kwargs):
src = os.path.expanduser(kwargs['source_dir'])
It finds any .org
file which are regular files, not symlinks.
files_glob = "**/*.org"
files = glob.glob(files_glob, root_dir=src, recursive=True)
files = map(lambda it: pathlib.Path(src).joinpath(it).expanduser(), files)
files = filter(lambda it: it.is_file(), files)
files = filter(lambda it: not it.is_symlink(), files)
files = map(lambda it: str(it), files)
files = list(files)
logger.info(f"Collected {len(list(files))} files for parsing.")
It goes over each of these, checking whether the document has been ingested before, or if it needs to be updated, or if it is an invalid document that must be skipped. roam.core.should_file_persist
returns a tuple with the state needed to decide what to do with a document, and then that action is taken here.
docs = []
persisted_paths = []
sorter = graphlib.TopologicalSorter()
for path in files:
is_existing, need_update, doc = should_file_persist(path)
if is_existing == True and need_update == True:
existing = File.objects.get(path=path)
existing.delete()
assert doc is not None
elif is_existing == False and need_update == True:
assert doc is not None
elif is_existing == True and need_update == False:
logger.debug(f"Skipping already-parsed file at {path}")
continue
elif is_existing == False and need_update == False:
logger.debug(f"Skipping invalid doc at {path}.\n")
continue
The command will extract ordering information from the document, namely ARROYO_MODULE_WANTS
and ARROYO_MODULE_WANTED
keywords in the file, these are used to when running The Arroyo Generators which may have relationships with objects that aren't created. Extracting the dependency ordering in to a graphlib.TopologicalSorter
ensures the dependencies are in place beforehand. Each doc is fed in to persist_one_file which will go over each model enumerated in the Project Configuration's ARCOLOGY_EXTRACTORS
and persist the elements from the parsed doc in to the DB, model by model.
DependencyRelationship.extract_ordering_from_arroyo(sorter, doc)
the_file = persist_one_file(doc)
docs.append(the_file)
persisted_paths.append(path)
Finally, as mentioned, the ordered ARROYO_EXTRACTORS
are run.
for path in sorter.static_order():
logger.debug(f"Attempting to parse {path}.")
doc = parse_doc(path)
arroyo_persist_one_file(doc)
logger.info(f"Scanned {len(files)} and parsed/persisted {len(docs)} docs.")
Generating code using the Arroyo Generators
This is the interface for The Arroyo Generators to generate Nix, Emacs Init, etc files from metadata in the databases.
import argparse
import pathlib
import sys
from django.core.management.base import BaseCommand, CommandError
from django.conf import settings
from django.utils.module_loading import import_string
from generators.models import DependencyRelationship
import logging
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'Export Nix and other code known by the DB'
requires_migrations_checks = True
def add_arguments(self, parser):
parser.add_argument('-m', '--module', type=str, help="The __class__.__name__ of the Arroyo Generator Module to collect")
parser.add_argument('-r', '--role', default="endpoint", type=str, help="A role to restrict the modules to")
parser.add_argument('-d', '--destination', default="-", type=str, help="Location to output generated content to. pass `-' for stdout.")
parser.add_argument('--dry-run', default=False, action=argparse.BooleanOptionalAction, help="Only list the org-files included in the export")
Invocation is simple, you pass it a module name specified in the ARROYO_EXTRACTORS
section of the Project Configuration, a role to restrict the modules to, and destination to write the file to, or stdout
. If you want to just list the org files that match, pass -n
.
So shell:python manage.py generate -m nixos -r server -d - will generate the import statements for The Wobserver's configuration.
The trick with the handler here is that every one of the Generator models implements two classmethod
's, collect_for_babel
which is used to return entities that are tied to that generator and the specified role, and an instance method to_babel
which is used to express the code.
Both of these functions have a decent base implementation in the Generator
abstract base class, but can be overridden as necessary. For example, the EmacsSnippet
class ensures these are returned in an ordered fashion, and that the contents of the files are exposed, whereas the Nix files can just be returned in any order, and only the file name needs to be referenced.
The command basically just looks up the Generator module to load, imports it, and then calls these functions to generate an output to write to the destination:
def handle(self, *args, **kwargs):
dry_run = kwargs.get('dry_run', False)
dest_arg = kwargs['destination']
if dest_arg == "-":
dest = sys.stdout
else:
dest = pathlib.Path(dest_arg).expanduser()
dest = open(dest, "w")
module = kwargs['module']
role = kwargs['role']
output = self.gen_generic(module, role, dry_run)
dest.write(output)
dest.close()
logger.info(f"Wrote {len(output)} chars.")
def gen_generic(self, module, role, dry_run=False):
mod_name = settings.ARROYO_EXTRACTORS.get(module, None)
if mod_name is None:
raise CommandError(f"Module {module} not found.")
mod = import_string(mod_name)
objs_in_role = mod.collect_for_babel(role=role)
logger.info(f"Tangling {len(objs_in_role)} entries for {role} role now.")
if dry_run:
return '\n'.join([
gen_obj.org_file.path
for gen_obj in objs_in_role
])
return '\n'.join([
gen_obj.to_babel()
for gen_obj in objs_in_role
])
roam/management/__init__.py (this isn't needed) roam/management/commands/__init__.py (this isn't needed)
Elisp Helpers
These will be the "public" interfaces for the generators, a set of Emacs functions that can be loaded in to the environment to call out to the management commands:
;; (defun arroyo-collect-files (module &optional role)
;; (setenv "ARCOLOGY_DB_PATH" "/home/rrix/org/arcology-django/db.sqlite3")
;; (let* ((flake-path "path:/home/rrix/org/arcology-django")
;; (cmd (s-join " " (list (format "nix run %s#arcology" flake-path)
;; "--"
;; "generate -m"
;; module
;; (when role
;; (format "-r %s" role))
;; "--dry-run"
;; "2>/dev/null"
;; " | sort")))
;; (output (shell-command-to-string cmd)))
;; output))
(defun arroyo-generate-imports (module &optional role destination do-sort)
(arcology-api-generator module role destination do-sort))
(defun arroyo-generate-imports-formatted (format-fn output)
(thread-last (arroyo-generate-imports "home-manager" "server")
(s-split "\n")
(-map format-fn)
(butlast)
(s-join "\n")))
Ingest files on-demand using Syncthing
shell:python manage.py watchsync will start a Syncthing client that will monitor the org directory for changes to ingest in to the database. While it will be easier when this thing is integrated with The Complete Computing Environment, it can be run for now with async-forever thus:
(defun arroyo-start-ingester ()
(interactive)
(setenv "ARCOLOGY_DB_PATH" "/home/rrix/org/arcology-django/db.sqlite3")
(let* ((key (getenv "ARCOLOGY_SYNCTHING_KEY"))
(flake-path "path:/home/rrix/org/arcology-django")
(cmd (format "ARCOLOGY_SYNCTHING_KEY=%s nix run %s#arcology -- watchsync" key flake-path)))
(cce/async-forever cmd "*arroyo-ingester*")))
(arroyo-start-ingester)
from django.core.management.base import BaseCommand, CommandError
from django.core.management import call_command
from django.conf import settings
import json
import pathlib
import urllib.parse
import urllib.request
import polling
import logging
logger = logging.getLogger(__name__)
# logger.setLevel(logging.DEBUG)
import roam.models
The command goes in to a simple loop with polling">polling
used to ensure that transient errors are retried and the loop does not die without good cause. The Syncthing API key is loaded from the Project Configuration and the path to monitor is passed in as an argument. Note that this requires the path to match a Syncthing directory! If you are monitoring a folder which is subdirectory of a shared folder, make sure to pass the root of the shared folder in instead or break the path in to its own Syncthing share.
This is a synchronous use of an HTTP Long Polling API so this should be invoked in its own worker rather than in the web process!
class Command(BaseCommand):
help = 'Watch the local syncthing instance for changes to ingest'
requires_migrations_checks = True
def add_arguments(self, parser):
parser.add_argument('-s', '--source_dir', type=str, default="~/org")
parser.add_argument('-f', '--folder_id', type=str,
help="Find directory to watch by this Syncthing Folder instead of source_dir.")
def handle(self, *args, **kwargs):
self.key = settings.SYNCTHING_KEY
assert self.key != None
if kwargs.get("folder_id") is not None and len(kwargs.get("folder_id")) > 0:
self.folder_id = kwargs['folder_id']
self.expanded_path = self.get_path_for_folder_id(self.folder_id)
logger.info(f"fetched folder ID {self.folder_id} AKA {self.expanded_path} from syncthing API")
else:
self.expanded_path = pathlib.Path(kwargs['source_dir']).expanduser()
self.folder_id = self.get_folder_id_for_path(self.expanded_path)
logger.info(f"fetched folder ID {self.folder_id} AKA {self.expanded_path} from syncthing API")
it would be nice to get the pagination key from the DB to have persistence across invocations, but re-running the ingester on startup when there isn't necessarily changes isn't really that bad, it'll just cause some disk IO while it checks file hashes:
since = None
logger.info(f"fetching events from API now.")
while True:
since = polling.poll(
lambda: self.get_one_event_batch(since),
step=30,
poll_forever=True
)
The watchsync
command queries the Syncthing Configuration API to pull the folder ID or path out, whichever you didn't ask for in the Command's kwargs:
def _get_folders(self):
url = f"http://localhost:8384/rest/config/folders"
req = urllib.request.Request(
url,
headers = {"X-API-Key": self.key}
)
rsp = urllib.request.urlopen(req)
return json.load(rsp)
def get_folder_id_for_path(self, path: pathlib.Path) -> str:
jason = self._get_folders()
the_folder = next(filter(lambda folder: folder["path"] == str(path), jason))
return the_folder["id"]
def get_path_for_folder_id(self, folder_id: str) -> pathlib.Path:
jason = self._get_folders()
the_folder = next(filter(lambda folder: folder["id"] == folder_id, jason))
return pathlib.Path(the_folder["path"])
The functionality to query the Syncthing Events API, extract events and filter them to decide whether the Arcology ingestfiles Command should be invoked is pretty straightforward use of urllib
.
def get_one_event_batch(self, since: str|None, timeout: int = 120):
params = urllib.parse.urlencode(dict(
since=since,
timeout=timeout,
))
url = f"http://localhost:8384/rest/events/disk?{params}"
req = urllib.request.Request(
url,
headers = {"X-API-Key": self.key}
)
rsp = urllib.request.urlopen(req, None, timeout=timeout)
jason = json.load(rsp)
ingest = False
last_since = None
for event in jason:
last_since = event.get("id")
data = event.get("data", {})
event_folder_id = data.get("folder", "")
file_path = pathlib.Path(event.get("data", {}).get("path"))
For each file path, it checks that the file name is not a temporary file name, that it ends with .org
and the change is made in the folder we are monitoring. Anything else is elided.
ingest_this = True
if file_path.name.startswith(".#"):
logger.debug("skip temp file")
ingest_this = False
elif file_path.suffix != ".org":
logger.debug(f"skip non-org file {file_path.suffix}")
ingest_this = False
elif event_folder_id != self.folder_id:
logger.debug(f"skip unmonitored folder {event_folder_id}")
ingest_this = False
If a file is deleted, make sure it's removed from the database. Note that this is still liable to miss files; eventually I will want to add a management command to go over the whole file listing and true up the database, but this is good enough for now as long as it's kept to be realtime.
elif data.get("action") == "deleted":
final_path = self.expanded_path.joinpath(file_path)
try:
f = roam.models.File.objects.get(path=final_path)
logger.debug(f"deleting {final_path}!")
f.delete()
ingest_this = False
except roam.models.File.DoesNotExist:
pass
# add new failure cases here.
logger.debug(f"proc {last_since} {event_folder_id}, ingest? {ingest}, ingest_this? {ingest_this}: {json.dumps(event)}")
if ingest_this == True:
ingest = ingest_this
logger.debug(f"{event}")
if ingest:
call_command('ingestfiles', self.expanded_path)
return last_since
The command returns the last event ID of the request which is used to paginate future requests made in the loop.
DONE make sure we are able to handle deletions here…
- State "DONE" from "NEXT" [2024-03-13 Wed 15:37]
Create Base Sites
This needs to be run to define the base sites when the server DB is created. Eventually these need to be specified outside the code, however, so that a user can extend the thing with new sites and domains more easily.
from django.core.management.base import BaseCommand, CommandError
from django.conf import settings
import json
import arcology.models
import roam.models
import generators.models
import logging
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'Seed site objects, etc.'
output_transaction = True
requires_migrations_checks = True
def handle(self, *args, **kwargs):
generator_roles = generators.models.GeneratorRole.get_or_create_many(settings.ARROYO_ROLES)
sites_path = settings.BASE_DIR / "arcology/settings/sites.json"
with open(sites_path, "r") as f:
sites = json.load(f)
for site in sites:
the_site = arcology.models.Site.objects.create(
key=site["key"],
title=site["title"],
css_file=site["css_file"],
link_color=site["link_color"],
)
for domain in site["domains"]:
the_site.sitedomain_set.create(domain=domain)