arcology-fastapi/arcology-inotify.org

12 KiB
Raw Permalink Blame History

Arcology Automated Database Builder

Arcology Updates the Database with Emacs

This should be a pretty simple state machine to run a single Emacs instance to generate the DB. But it never works out to be.

  • inotify will send events to a queue
  • an instance will start five seconds after an inotify event
  • an incoming inotify event will start the five second count down over
  • the next instance cannot start until three minutes after the most recent file change

This thing is started with shell:nix-shell --run "arcology-inotify&%2334;%20&

DatabaseBuilder module encapsulates the transition logic of the server

pytransitions will be able to model this all, helpfully.

from typing import Optional, List
from datetime import datetime
import logging
import asyncio

from pathlib import Path

from transitions.extensions.asyncio import AsyncMachine
import asyncinotify as ain

from arcology.batch import build_command
from arcology.config import get_settings
import os

logger = logging.getLogger('arcology-arroyo')

class DatabaseBuilder(object):
    last_touch: Optional[datetime] = None
    last_run: Optional[datetime] = None

    logger = logging.getLogger('arcology-arroyo')
    debounce: int = get_settings().db_generation_debounce
    cooldown: int = get_settings().db_generation_cooldown
    stoked: bool = False

    inotify: ain.Inotify
    watched_paths: List[Path] = []
    
    def __init__(self):
        self.machine = AsyncMachine(model=self, states=DatabaseBuilder.states, initial='idle', queued=True)
        self.inotify =  ain.Inotify()
        self.prep_inotify()

        <<transitions>>

    <<make-inotify>>

    <<process_inotify>>

    <<loop-fn>>

    <<classmethods>>

Emacs Runner Finite State Machine   ATTACH

pytransitions can render a roam:Graphviz even if the Arcology can't render it yet:

import arcology.builder
from transitions.extensions import GraphMachine

class GraphMachineRenderer():
  def __init__(self):
    self.machine = GraphMachine(
      model=arcology.builder.DatabaseBuilder,
      states=arcology.builder.DatabaseBuilder.states,
      initial='idle',
      queued=True,
      use_pygraphviz=False
    )
    <<transitions>>

m = GraphMachineRenderer().machine
arcology.builder.draw(m, './data/fsm-graph.png.png')
return "./data/fsm-graph.png.png"

/rrix/arcology-fastapi/src/branch/main/data/fsm-graph.png.png

States

states = ['idle', 'waiting', 'running', 'cooldown']
  • idle : no events
  • waiting : inotify event has occurred, waiting for de-bounce cooldown
  • running : emacs is running
  • cooldown : after running emacs, there will be a wait period for some time to keep emacs from constantly running while writing

Transitions

from pathlib import Path
def draw(m: AsyncMachine, dest: Path):
  m.get_graph().draw(str(dest), prog='dot')
touch transitions in to waiting for debounce period

Touching a file triggers the waiting period, and updates an internal date-time tracking the last time a file was touched.

self.machine.add_transition('touch', ['waiting', 'idle'], 'waiting', before='update_last_touch')

Touching can also be done in the cooldown state, but will cause a transition in to a "stoked" state where a timer will be run and the emacs will be started when the cooldown is set to expire.

self.machine.add_transition('touch', 'cooldown', 'cooldown', before=['update_last_touch', 'stoke_machine'])
def update_last_touch(self):
    self.logger.info("been touched... old: {} now: {}".format(self.last_touch, datetime.now()))
    self.last_touch = datetime.now()

async def stoke_machine(self):
    self.logger.info("stokin'!")
    self.stoked = True
start transitions in to running to start Emacs after the debounce window.

Entering the running state is only valid if we've passed a debounce window configurable in the Arcology BaseSettings Configuration Class.

It uses the build_command from arcology.batch.

self.machine.add_transition('start', 'waiting', 'running', conditions='is_debounced', after='start_emacs')
def is_debounced(self):
    now = datetime.now()
    if self.last_touch is None:
        return False
    if (now - self.last_touch).total_seconds() > self.debounce:
        return True
    return False

async def start_emacs(self):
    self.logger.info("starting emacs")
    self.stoked = False
    await self.do_run()
    await self.done()

async def do_run(self):
    return await asyncio.create_subprocess_shell(build_command())
done transitions in to cooldown state

Entering the cooldown state will set the last_run and re-set last_touch. This cooldown state

self.machine.add_transition('done', 'running', 'cooldown', before='reset_state')
def reset_state(self):
    self.last_run = datetime.now()
    self.last_touch = None
ready transitions finally in to the initial idle state

Transitions out of the idle is conditional on a cool down window which is configurable in Arcology BaseSettings Configuration Class.

self.machine.add_transition('ready', 'cooldown', 'idle', conditions='is_cooled_down')
def is_cooled_down(self):
    anchor = self.last_run or datetime.now()
    td = (datetime.now() - anchor).total_seconds() 
    if self.last_run is None or td > self.cooldown:
         return True
    self.logger.debug("Still cooling down. delta: %s", td)
    return False

Wrapping the State Machine with asyncinotify

All of this feeds in to that DatabaseBuilder class, ultimately.

process_inotify does the bulk of the work. It looks to make sure the file is an org file, then tries to transition to waiting state through touch()">waiting state through touch(). This can fail, but that's fine as this async function is wrapped with a timeout and will be retried repeatedly until it can perform the state transition.

async def process_inotify(self):
    event = await self.inotify.get()

    if not event.name:
        return 
    file_suffixes = event.name.suffixes
    if ".org" not in file_suffixes:
        return
    if ".~undo-tree~" in file_suffixes:
        return
    if ".tmp" in file_suffixes:
        return
    if event.name.name.startswith(".#"):
        return
    if event.mask | ain.Mask.CREATE:
        path = event.path
        logger.info("A new file to watch: %s", path)
        self.add_watch(path)

    logger.info("has event... %s", event.path)
    logger.debug("state: %s", self.state)
    logger.debug("last_touch: %s", (datetime.now() - (self.last_touch or datetime.now())).total_seconds())
    logger.debug("last_run: %s", (datetime.now() - (self.last_run or datetime.now())).total_seconds())

    try: 
        await self.touch()
    except transitions.core.MachineError:
        pass

Note that in process_inotify there seems to be a bug/interaction with the way Syncthing persists files… of course everything uses creation instead of writing to existing files, so unless I track all the watched paths myself it seems like there is a race condition if a file is updated while the Emacs process is running… Not sure how to tackle this except to scream from the mountains or set up a lax restart policy for the systemd file… 🤔

There is also a noweb hook to insert

Setting up File Watch Rules

inotify doesn't have a recursive watch you monitor files, and you monitor directories for changes to the tree, but you can't monitor a directory for changes in the files residing in them. So os.walk comes to hand here, and additionally we use this to ignore certain types of directories which may have org files.

This doesn't filter to only monitor the org files though……

INOTIFY_MASK = ain.Mask.MODIFY | ain.Mask.CREATE | ain.Mask.DELETE | ain.Mask.MOVE 

def add_watch(self, path: Path):
    if path not in self.watched_paths:
        self.inotify.add_watch(path, self.INOTIFY_MASK)
        self.watched_paths.append(path)
    else:
        logger.debug(f"ignoring already watched {path}")

def prep_inotify(self):
    IGNORED_DIRS = set(['.git'])
    IGNORED_FILES = set([])
    
    arcology_dir = get_settings().arcology_directory
    path = Path(arcology_dir).expanduser().resolve()
    logger.info("walking arcology_dir %s", path)
    
    for path, dirs, files in os.walk(path):
        self.add_watch(path)
        [dirs.remove(ignored) for ignored in IGNORED_DIRS.intersection(dirs)]
        [files.remove(ignored) for ignored in IGNORED_FILES.intersection(files)]

    logger.info("constructed inotify with %s watches", len(self.inotify._watches))

The loop is pretty simple. Create an FSM, create an inotify object, then waiting a few seconds for inotify events to come in. It will also attempt to reset the state machine, if it's in cooldown stage

async def loop(self):
    await self.touch() # ? starts with an index operation instead

    while True:
        try: 
            await asyncio.wait_for(self.process_inotify(), 5)
        except asyncio.exceptions.TimeoutError:
            logger.debug("Timeout")

        # is it stoked?
        if self.state == 'idle' and self.stoked:
            await self.touch()

        # try to start after debounce
        if self.state == 'waiting':
            await self.start()

        # try to start after cooldown
        if self.state == 'cooldown':
            await self.ready()

Simple standalone starter

the DatabaseBuilder runs in an asyncio event loop, eventually it'll probably be started within Arcology FastAPI to drastically simplifying operating it, but for now it's operated by invoking this script, packaged in the Arcology Poetry Pyproject with the command wrapper arcology-inotify

from datetime import datetime
from pathlib import Path
import asyncio
import logging
import sys

from arcology.builder import DatabaseBuilder
from arcology.config import get_settings

logging.basicConfig(level=logging.DEBUG)
logging.getLogger('transitions').setLevel(logging.INFO)
logging.getLogger('arcology-arroyo').setLevel(logging.INFO)

db = DatabaseBuilder()

def start():
    print(sys.argv)
    if "--once" in sys.argv:
        asyncio.run(db.do_run())
    else:
        asyncio.run(db.loop())

if __name__ == "__main__":
    start()