arcology-fastapi/arcology/builder.py

152 lines
5.3 KiB
Python

from typing import Optional, List
from datetime import datetime
import logging
import asyncio
from pathlib import Path
from transitions.extensions.asyncio import AsyncMachine
import asyncinotify as ain
from arcology.batch import build_command
from arcology.config import get_settings
import os
logger = logging.getLogger('arcology-arroyo')
class DatabaseBuilder(object):
last_touch: Optional[datetime] = None
last_run: Optional[datetime] = None
logger = logging.getLogger('arcology-arroyo')
debounce: int = get_settings().db_generation_debounce
cooldown: int = get_settings().db_generation_cooldown
stoked: bool = False
inotify: ain.Inotify
watched_paths: List[Path] = []
def __init__(self):
self.machine = AsyncMachine(model=self, states=DatabaseBuilder.states, initial='idle', queued=True)
self.inotify = ain.Inotify()
self.prep_inotify()
self.machine.add_transition('touch', ['waiting', 'idle'], 'waiting', before='update_last_touch')
self.machine.add_transition('touch', 'cooldown', 'cooldown', before=['update_last_touch', 'stoke_machine'])
self.machine.add_transition('start', 'waiting', 'running', conditions='is_debounced', after='start_emacs')
self.machine.add_transition('done', 'running', 'cooldown', before='reset_state')
self.machine.add_transition('ready', 'cooldown', 'idle', conditions='is_cooled_down')
INOTIFY_MASK = ain.Mask.MODIFY | ain.Mask.CREATE | ain.Mask.DELETE | ain.Mask.MOVE
def add_watch(self, path: Path):
if path not in self.watched_paths:
self.inotify.add_watch(path, self.INOTIFY_MASK)
self.watched_paths.append(path)
else:
logger.debug(f"ignoring already watched {path}")
def prep_inotify(self):
IGNORED_DIRS = set(['.git'])
IGNORED_FILES = set([])
arcology_dir = get_settings().arcology_directory
path = Path(arcology_dir).expanduser().resolve()
logger.info("walking arcology_dir %s", path)
for path, dirs, files in os.walk(path):
self.add_watch(path)
[dirs.remove(ignored) for ignored in IGNORED_DIRS.intersection(dirs)]
[files.remove(ignored) for ignored in IGNORED_FILES.intersection(files)]
logger.info("constructed inotify with %s watches", len(self.inotify._watches))
async def process_inotify(self):
event = await self.inotify.get()
if not event.name:
return
file_suffixes = event.name.suffixes
if ".org" not in file_suffixes:
return
if ".~undo-tree~" in file_suffixes:
return
if ".tmp" in file_suffixes:
return
if event.name.name.startswith(".#"):
return
if event.mask | ain.Mask.CREATE:
path = event.path
logger.info("A new file to watch: %s", path)
self.add_watch(path)
logger.info("has event... %s", event.path)
logger.debug("state: %s", self.state)
logger.debug("last_touch: %s", (datetime.now() - (self.last_touch or datetime.now())).total_seconds())
logger.debug("last_run: %s", (datetime.now() - (self.last_run or datetime.now())).total_seconds())
try:
await self.touch()
except transitions.core.MachineError:
pass
async def loop(self):
await self.touch() # ? starts with an index operation instead
while True:
try:
await asyncio.wait_for(self.process_inotify(), 5)
except asyncio.exceptions.TimeoutError:
logger.debug("Timeout")
# is it stoked?
if self.state == 'idle' and self.stoked:
await self.touch()
# try to start after debounce
if self.state == 'waiting':
await self.start()
# try to start after cooldown
if self.state == 'cooldown':
await self.ready()
states = ['idle', 'waiting', 'running', 'cooldown']
def update_last_touch(self):
self.logger.info("been touched... old: {} now: {}".format(self.last_touch, datetime.now()))
self.last_touch = datetime.now()
async def stoke_machine(self):
self.logger.info("stokin'!")
self.stoked = True
def is_debounced(self):
now = datetime.now()
if self.last_touch is None:
return False
if (now - self.last_touch).total_seconds() > self.debounce:
return True
return False
async def start_emacs(self):
self.logger.info("starting emacs")
self.stoked = False
await self.do_run()
await self.done()
async def do_run(self):
return await asyncio.create_subprocess_shell(build_command())
def reset_state(self):
self.last_run = datetime.now()
self.last_touch = None
def is_cooled_down(self):
anchor = self.last_run or datetime.now()
td = (datetime.now() - anchor).total_seconds()
if self.last_run is None or td > self.cooldown:
return True
self.logger.debug("Still cooling down. delta: %s", td)
return False
from pathlib import Path
def draw(m: AsyncMachine, dest: Path):
m.get_graph().draw(str(dest), prog='dot')