arcology/syncthonk/management/commands/watchsync.py

133 lines
5.7 KiB
Python

# [[file:../../../interfaces.org::*Ingest files on-demand using Syncthing][Ingest files on-demand using Syncthing:2]]
from django.core.management.base import BaseCommand, CommandError
from django.core.management import call_command
from django.conf import settings
import json
import pathlib
import urllib.parse
import urllib.request
import polling
import logging
logger = logging.getLogger(__name__)
# logger.setLevel(logging.DEBUG)
import roam.models
# Ingest files on-demand using Syncthing:2 ends here
# [[file:../../../interfaces.org::*Ingest files on-demand using Syncthing][Ingest files on-demand using Syncthing:3]]
class Command(BaseCommand):
help = 'Watch the local syncthing instance for changes to ingest'
requires_migrations_checks = True
def add_arguments(self, parser):
parser.add_argument('-s', '--source_dir', type=str, default="~/org")
parser.add_argument('-f', '--folder_id', type=str,
help="Find directory to watch by this Syncthing Folder instead of source_dir.")
def handle(self, *args, **kwargs):
self.key = settings.SYNCTHING_KEY
assert self.key != None
if kwargs.get("folder_id") is not None and len(kwargs.get("folder_id")) > 0:
self.folder_id = kwargs['folder_id']
self.expanded_path = self.get_path_for_folder_id(self.folder_id)
logger.info(f"fetched folder ID {self.folder_id} AKA {self.expanded_path} from syncthing API")
else:
self.expanded_path = pathlib.Path(kwargs['source_dir']).expanduser()
self.folder_id = self.get_folder_id_for_path(self.expanded_path)
logger.info(f"fetched folder ID {self.folder_id} AKA {self.expanded_path} from syncthing API")
# Ingest files on-demand using Syncthing:3 ends here
# [[file:../../../interfaces.org::*Ingest files on-demand using Syncthing][Ingest files on-demand using Syncthing:4]]
since = None
logger.info(f"fetching events from API now.")
while True:
since = polling.poll(
lambda: self.get_one_event_batch(since),
step=30,
poll_forever=True
)
# Ingest files on-demand using Syncthing:4 ends here
# [[file:../../../interfaces.org::*Ingest files on-demand using Syncthing][Ingest files on-demand using Syncthing:5]]
def _get_folders(self):
url = f"http://localhost:8384/rest/config/folders"
req = urllib.request.Request(
url,
headers = {"X-API-Key": self.key}
)
rsp = urllib.request.urlopen(req)
return json.load(rsp)
def get_folder_id_for_path(self, path: pathlib.Path) -> str:
jason = self._get_folders()
the_folder = next(filter(lambda folder: folder["path"] == str(path), jason))
return the_folder["id"]
def get_path_for_folder_id(self, folder_id: str) -> pathlib.Path:
jason = self._get_folders()
the_folder = next(filter(lambda folder: folder["id"] == folder_id, jason))
return pathlib.Path(the_folder["path"])
# Ingest files on-demand using Syncthing:5 ends here
# [[file:../../../interfaces.org::*Ingest files on-demand using Syncthing][Ingest files on-demand using Syncthing:6]]
def get_one_event_batch(self, since: str|None, timeout: int = 120):
params = urllib.parse.urlencode(dict(
since=since,
timeout=timeout,
))
url = f"http://localhost:8384/rest/events/disk?{params}"
req = urllib.request.Request(
url,
headers = {"X-API-Key": self.key}
)
rsp = urllib.request.urlopen(req, None, timeout=timeout)
jason = json.load(rsp)
ingest = False
last_since = None
for event in jason:
last_since = event.get("id")
data = event.get("data", {})
event_folder_id = data.get("folder", "")
file_path = pathlib.Path(event.get("data", {}).get("path"))
# Ingest files on-demand using Syncthing:6 ends here
# [[file:../../../interfaces.org::*Ingest files on-demand using Syncthing][Ingest files on-demand using Syncthing:7]]
ingest_this = True
if file_path.name.startswith(".#"):
logger.debug("skip temp file")
ingest_this = False
elif file_path.suffix != ".org":
logger.debug(f"skip non-org file {file_path.suffix}")
ingest_this = False
elif event_folder_id != self.folder_id:
logger.debug(f"skip unmonitored folder {event_folder_id}")
ingest_this = False
# Ingest files on-demand using Syncthing:7 ends here
# [[file:../../../interfaces.org::*Ingest files on-demand using Syncthing][Ingest files on-demand using Syncthing:8]]
elif data.get("action") == "deleted":
final_path = self.expanded_path.joinpath(file_path)
try:
f = roam.models.File.objects.get(path=final_path)
logger.debug(f"deleting {final_path}!")
f.delete()
ingest_this = False
except roam.models.File.DoesNotExist:
pass
# Ingest files on-demand using Syncthing:8 ends here
# [[file:../../../interfaces.org::*Ingest files on-demand using Syncthing][Ingest files on-demand using Syncthing:9]]
# add new failure cases here.
logger.debug(f"proc {last_since} {event_folder_id}, ingest? {ingest}, ingest_this? {ingest_this}: {json.dumps(event)}")
if ingest_this == True:
ingest = ingest_this
logger.debug(f"{event}")
if ingest:
call_command('ingestfiles', self.expanded_path)
return last_since
# Ingest files on-demand using Syncthing:9 ends here