arcology/interfaces.org

20 KiB

Interfacing with the Arcology

,#+AUTO_TANGLE: vars:org-babel-default-header-args

public API and grymoire of common commands and literate-programming patterns

Ingest files in bulk using the command line

This work is a bit complicated, but working through it will help. This is the command which seeds a database for the Arcology by parsing each document and persisting relevant metadata.

import os.path
import glob
from itertools import filterfalse
import pdb
import graphlib
import pathlib

from generators.models import DependencyRelationship

from django.core.management.base import BaseCommand, CommandError
from django.utils.module_loading import import_string

import arroyo.arroyo_rs as native

import logging
logger = logging.getLogger(__name__)

# from roam.models import File, Keyword, Heading, Tag, Reference
from roam.models import File
from roam.core import persist_one_file, arroyo_persist_one_file, parse_doc
from roam.core import should_file_persist
class Command(BaseCommand):
    help = 'Ingest files in to the Roam database'
    output_transaction = True
    requires_migrations_checks = True

It acts on a single argument, the directory containing the org-roam files.

    def add_arguments(self, parser):
        parser.add_argument('source_dir', type=str, default="~/org", nargs='?')

    def handle(self, *args, **kwargs):
        src = os.path.expanduser(kwargs['source_dir'])

It finds any .org file which are regular files, not symlinks.

        files_glob = "**/*.org"
        files = glob.glob(files_glob, root_dir=src, recursive=True)
        files = map(lambda it: pathlib.Path(src).joinpath(it).expanduser(), files)
        files = filter(lambda it: it.is_file(), files)
        files = filter(lambda it: not it.is_symlink(), files)
        files = map(lambda it: str(it), files)
        files = list(files)

        logger.info(f"Collected {len(list(files))} files for parsing.")

It goes over each of these, checking whether the document has been ingested before, or if it needs to be updated, or if it is an invalid document that must be skipped. roam.core.should_file_persist returns a tuple with the state needed to decide what to do with a document, and then that action is taken here.

        docs = []
        persisted_paths = []
        sorter = graphlib.TopologicalSorter()
        for path in files:
            is_existing, need_update, doc = should_file_persist(path)
            if is_existing == True and need_update == True:
              existing = File.objects.get(path=path)
              existing.delete()
              assert doc is not None
            elif is_existing == False and need_update == True:
              assert doc is not None
            elif is_existing == True and need_update == False:
              logger.debug(f"Skipping already-parsed file at {path}")
              continue
            elif is_existing == False and need_update == False:
              logger.debug(f"Skipping invalid doc at {path}.\n")
              continue

The command will extract ordering information from the document, namely ARROYO_MODULE_WANTS and ARROYO_MODULE_WANTED keywords in the file, these are used to when running The Arroyo Generators which may have relationships with objects that aren't created. Extracting the dependency ordering in to a graphlib.TopologicalSorter ensures the dependencies are in place beforehand. Each doc is fed in to persist_one_file which will go over each model enumerated in the Project Configuration's ARCOLOGY_EXTRACTORS and persist the elements from the parsed doc in to the DB, model by model.

            DependencyRelationship.extract_ordering_from_arroyo(sorter, doc)
            the_file = persist_one_file(doc)
            docs.append(the_file)
            persisted_paths.append(path)

Finally, as mentioned, the ordered ARROYO_EXTRACTORS are run.

        for path in sorter.static_order():
            logger.debug(f"Attempting to parse {path}.")
            doc = parse_doc(path)
            arroyo_persist_one_file(doc)
        logger.info(f"Scanned {len(files)} and parsed/persisted {len(docs)} docs.")

Generating code using the Arroyo Generators

This is the interface for The Arroyo Generators to generate Nix, Emacs Init, etc files from metadata in the databases.

import argparse
import pathlib
import sys

from django.core.management.base import BaseCommand, CommandError
from django.conf import settings
from django.utils.module_loading import import_string

from generators.models import DependencyRelationship

import logging
logger = logging.getLogger(__name__)

class Command(BaseCommand):
    help = 'Export Nix and other code known by the DB'
    requires_migrations_checks = True

    def add_arguments(self, parser):
        parser.add_argument('-m', '--module', type=str, help="The __class__.__name__ of the Arroyo Generator Module to collect")
        parser.add_argument('-r', '--role', default="endpoint", type=str, help="A role to restrict the modules to")
        parser.add_argument('-d', '--destination', default="-", type=str, help="Location to output generated content to. pass `-' for stdout.")
        parser.add_argument('--dry-run', default=False, action=argparse.BooleanOptionalAction, help="Only list the org-files included in the export")

Invocation is simple, you pass it a module name specified in the ARROYO_EXTRACTORS section of the Project Configuration, a role to restrict the modules to, and destination to write the file to, or stdout. If you want to just list the org files that match, pass -n.

So shell:python manage.py generate -m nixos -r server -d - will generate the import statements for The Wobserver's configuration.

The trick with the handler here is that every one of the Generator models implements two classmethod's, collect_for_babel which is used to return entities that are tied to that generator and the specified role, and an instance method to_babel which is used to express the code.

Both of these functions have a decent base implementation in the Generator abstract base class, but can be overridden as necessary. For example, the EmacsSnippet class ensures these are returned in an ordered fashion, and that the contents of the files are exposed, whereas the Nix files can just be returned in any order, and only the file name needs to be referenced.

The command basically just looks up the Generator module to load, imports it, and then calls these functions to generate an output to write to the destination:

    def handle(self, *args, **kwargs):
        dry_run = kwargs.get('dry_run', False)
        dest_arg = kwargs['destination']
        if dest_arg == "-":
            dest = sys.stdout
        else:
            dest = pathlib.Path(dest_arg).expanduser()
            dest = open(dest, "w")

        module = kwargs['module']
        role = kwargs['role']

        output = self.gen_generic(module, role, dry_run)

        dest.write(output)
        dest.close()
        logger.info(f"Wrote {len(output)} chars.")


    def gen_generic(self, module, role, dry_run=False):
        mod_name = settings.ARROYO_EXTRACTORS.get(module, None)
        if mod_name is None:
            raise CommandError(f"Module {module} not found.")
        mod = import_string(mod_name)

        objs_in_role = mod.collect_for_babel(role=role)
        logger.info(f"Tangling {len(objs_in_role)} entries for {role} role now.")

        if dry_run:
            return '\n'.join([
                    gen_obj.org_file.path
                    for gen_obj in objs_in_role
                ])

        return '\n'.join([
            gen_obj.to_babel()
            for gen_obj in objs_in_role
        ])

roam/management/__init__.py (this isn't needed) roam/management/commands/__init__.py (this isn't needed)

Elisp Helpers

These will be the "public" interfaces for the generators, a set of Emacs functions that can be loaded in to the environment to call out to the management commands:

;; (defun arroyo-collect-files (module &optional role)
;;   (setenv "ARCOLOGY_DB_PATH" "/home/rrix/org/arcology-django/db.sqlite3")
;;   (let* ((flake-path "path:/home/rrix/org/arcology-django")
;;          (cmd (s-join " " (list (format "nix run %s#arcology" flake-path)
;;                                 "--"
;;                                 "generate -m"
;;                                 module
;;                                 (when role
;;                                   (format "-r %s" role))
;;                                 "--dry-run"
;;                                 "2>/dev/null"
;;                                 " | sort")))
;;          (output (shell-command-to-string cmd)))
;;     output))

(defun arroyo-generate-imports (module &optional role destination  do-sort)
  (arcology-api-generator module role destination do-sort))

(defun arroyo-generate-imports-formatted (format-fn output)
  (thread-last (arroyo-generate-imports "home-manager" "server")
               (s-split "\n")
               (-map format-fn)
               (butlast)
               (s-join "\n")))

Ingest files on-demand using Syncthing

shell:python manage.py watchsync will start a Syncthing client that will monitor the org directory for changes to ingest in to the database. While it will be easier when this thing is integrated with The Complete Computing Environment, it can be run for now with async-forever thus:

(defun arroyo-start-ingester ()
  (interactive)
  (setenv "ARCOLOGY_DB_PATH" "/home/rrix/org/arcology-django/db.sqlite3")
  (let* ((key (getenv "ARCOLOGY_SYNCTHING_KEY"))
         (flake-path "path:/home/rrix/org/arcology-django")
         (cmd (format "ARCOLOGY_SYNCTHING_KEY=%s nix run %s#arcology -- watchsync" key flake-path)))
    (cce/async-forever cmd "*arroyo-ingester*")))
(arroyo-start-ingester)
from django.core.management.base import BaseCommand, CommandError
from django.core.management import call_command
from django.conf import settings

import json
import pathlib
import urllib.parse
import urllib.request

import polling

import logging
logger = logging.getLogger(__name__)
# logger.setLevel(logging.DEBUG)

import roam.models

The command goes in to a simple loop with polling">polling used to ensure that transient errors are retried and the loop does not die without good cause. The Syncthing API key is loaded from the Project Configuration and the path to monitor is passed in as an argument. Note that this requires the path to match a Syncthing directory! If you are monitoring a folder which is subdirectory of a shared folder, make sure to pass the root of the shared folder in instead or break the path in to its own Syncthing share.

This is a synchronous use of an HTTP Long Polling API so this should be invoked in its own worker rather than in the web process!

class Command(BaseCommand):
    help = 'Watch the local syncthing instance for changes to ingest'
    requires_migrations_checks = True

    def add_arguments(self, parser):
        parser.add_argument('-s', '--source_dir', type=str, default="~/org")
        parser.add_argument('-f', '--folder_id', type=str,
                            help="Find directory to watch by this Syncthing Folder instead of source_dir.")

    def handle(self, *args, **kwargs):
        self.key = settings.SYNCTHING_KEY
        assert self.key != None
        if kwargs.get("folder_id") is not None and len(kwargs.get("folder_id")) > 0:
            self.folder_id = kwargs['folder_id']
            self.expanded_path = self.get_path_for_folder_id(self.folder_id)
            logger.info(f"fetched folder ID {self.folder_id} AKA {self.expanded_path} from syncthing API")
        else:
            self.expanded_path = pathlib.Path(kwargs['source_dir']).expanduser()
            self.folder_id = self.get_folder_id_for_path(self.expanded_path)
            logger.info(f"fetched folder ID {self.folder_id} AKA {self.expanded_path} from syncthing API")

it would be nice to get the pagination key from the DB to have persistence across invocations, but re-running the ingester on startup when there isn't necessarily changes isn't really that bad, it'll just cause some disk IO while it checks file hashes:

        since = None
        logger.info(f"fetching events from API now.")
        while True:
            since = polling.poll(
                lambda: self.get_one_event_batch(since),
                step=30,
                poll_forever=True
            )

The watchsync command queries the Syncthing Configuration API to pull the folder ID or path out, whichever you didn't ask for in the Command's kwargs:

    def _get_folders(self):
        url = f"http://localhost:8384/rest/config/folders"
        req = urllib.request.Request(
            url,
            headers = {"X-API-Key": self.key}
        )
        rsp = urllib.request.urlopen(req)
        return json.load(rsp)

    def get_folder_id_for_path(self, path: pathlib.Path) -> str:
        jason = self._get_folders()
        the_folder = next(filter(lambda folder: folder["path"] == str(path), jason))
        return the_folder["id"]

    def get_path_for_folder_id(self, folder_id: str) -> pathlib.Path:
        jason = self._get_folders()
        the_folder = next(filter(lambda folder: folder["id"] == folder_id, jason))
        return pathlib.Path(the_folder["path"])

The functionality to query the Syncthing Events API, extract events and filter them to decide whether the Arcology ingestfiles Command should be invoked is pretty straightforward use of urllib.

    def get_one_event_batch(self, since: str|None, timeout: int = 120):
        params = urllib.parse.urlencode(dict(
            since=since,
            timeout=timeout,
        ))
        url = f"http://localhost:8384/rest/events/disk?{params}"
        req = urllib.request.Request(
            url,
            headers = {"X-API-Key": self.key}
        )
        rsp = urllib.request.urlopen(req, None, timeout=timeout)
        jason = json.load(rsp)

        ingest = False
        last_since = None
        for event in jason:
            last_since = event.get("id")
            data = event.get("data", {})
            event_folder_id = data.get("folder", "")
            file_path = pathlib.Path(event.get("data", {}).get("path"))

For each file path, it checks that the file name is not a temporary file name, that it ends with .org and the change is made in the folder we are monitoring. Anything else is elided.

            ingest_this = True
            if file_path.name.startswith(".#"):
                logger.debug("skip temp file")
                ingest_this = False
            elif file_path.suffix != ".org":
                logger.debug(f"skip non-org file {file_path.suffix}")
                ingest_this = False
            elif event_folder_id != self.folder_id:
                logger.debug(f"skip unmonitored folder {event_folder_id}")
                ingest_this = False

If a file is deleted, make sure it's removed from the database. Note that this is still liable to miss files; eventually I will want to add a management command to go over the whole file listing and true up the database, but this is good enough for now as long as it's kept to be realtime.

elif data.get("action") == "deleted":
    final_path = self.expanded_path.joinpath(file_path)
    try:
        f = roam.models.File.objects.get(path=final_path)
        logger.debug(f"deleting {final_path}!")
        f.delete()
        ingest_this = False
    except roam.models.File.DoesNotExist:
        pass
# add new failure cases here.
logger.debug(f"proc {last_since} {event_folder_id}, ingest? {ingest}, ingest_this? {ingest_this}: {json.dumps(event)}")
if ingest_this == True:
    ingest = ingest_this
    logger.debug(f"{event}")
if ingest:
call_command('ingestfiles', self.expanded_path)
return last_since

The command returns the last event ID of the request which is used to paginate future requests made in the loop.

DONE make sure we are able to handle deletions here…

  • State "DONE" from "NEXT" [2024-03-13 Wed 15:37]

Create Base Sites

This needs to be run to define the base sites when the server DB is created. Eventually these need to be specified outside the code, however, so that a user can extend the thing with new sites and domains more easily.

shell:python manage.py seed

from django.core.management.base import BaseCommand, CommandError
from django.conf import settings
import json

import arcology.models
import roam.models
import generators.models

import logging
logger = logging.getLogger(__name__)

class Command(BaseCommand):
    help = 'Seed site objects, etc.'
    output_transaction = True
    requires_migrations_checks = True

    def handle(self, *args, **kwargs):
        generator_roles = generators.models.GeneratorRole.get_or_create_many(settings.ARROYO_ROLES)

        sites_path = settings.BASE_DIR / "arcology/settings/sites.json"
        with open(sites_path, "r") as f:
          sites = json.load(f)

        for site in sites:
            the_site = arcology.models.Site.objects.create(
                key=site["key"],
                title=site["title"],
                css_file=site["css_file"],
                link_color=site["link_color"],
            )
            for domain in site["domains"]:
                the_site.sitedomain_set.create(domain=domain)