133 lines
5.7 KiB
Python
133 lines
5.7 KiB
Python
# [[file:../../../interfaces.org::*Ingest files on-demand using Syncthing][Ingest files on-demand using Syncthing:2]]
|
|
from django.core.management.base import BaseCommand, CommandError
|
|
from django.core.management import call_command
|
|
from django.conf import settings
|
|
|
|
import json
|
|
import pathlib
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
import polling
|
|
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
# logger.setLevel(logging.DEBUG)
|
|
|
|
import roam.models
|
|
# Ingest files on-demand using Syncthing:2 ends here
|
|
|
|
# [[file:../../../interfaces.org::*Ingest files on-demand using Syncthing][Ingest files on-demand using Syncthing:3]]
|
|
class Command(BaseCommand):
|
|
help = 'Watch the local syncthing instance for changes to ingest'
|
|
requires_migrations_checks = True
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument('-s', '--source_dir', type=str, default="~/org")
|
|
parser.add_argument('-f', '--folder_id', type=str,
|
|
help="Find directory to watch by this Syncthing Folder instead of source_dir.")
|
|
|
|
def handle(self, *args, **kwargs):
|
|
self.key = settings.SYNCTHING_KEY
|
|
assert self.key != None
|
|
if kwargs.get("folder_id") is not None and len(kwargs.get("folder_id")) > 0:
|
|
self.folder_id = kwargs['folder_id']
|
|
self.expanded_path = self.get_path_for_folder_id(self.folder_id)
|
|
logger.info(f"fetched folder ID {self.folder_id} AKA {self.expanded_path} from syncthing API")
|
|
else:
|
|
self.expanded_path = pathlib.Path(kwargs['source_dir']).expanduser()
|
|
self.folder_id = self.get_folder_id_for_path(self.expanded_path)
|
|
logger.info(f"fetched folder ID {self.folder_id} AKA {self.expanded_path} from syncthing API")
|
|
# Ingest files on-demand using Syncthing:3 ends here
|
|
|
|
# [[file:../../../interfaces.org::*Ingest files on-demand using Syncthing][Ingest files on-demand using Syncthing:4]]
|
|
since = None
|
|
logger.info(f"fetching events from API now.")
|
|
while True:
|
|
since = polling.poll(
|
|
lambda: self.get_one_event_batch(since),
|
|
step=30,
|
|
poll_forever=True
|
|
)
|
|
# Ingest files on-demand using Syncthing:4 ends here
|
|
|
|
# [[file:../../../interfaces.org::*Ingest files on-demand using Syncthing][Ingest files on-demand using Syncthing:5]]
|
|
def _get_folders(self):
|
|
url = f"http://localhost:8384/rest/config/folders"
|
|
req = urllib.request.Request(
|
|
url,
|
|
headers = {"X-API-Key": self.key}
|
|
)
|
|
rsp = urllib.request.urlopen(req)
|
|
return json.load(rsp)
|
|
|
|
def get_folder_id_for_path(self, path: pathlib.Path) -> str:
|
|
jason = self._get_folders()
|
|
the_folder = next(filter(lambda folder: folder["path"] == str(path), jason))
|
|
return the_folder["id"]
|
|
|
|
def get_path_for_folder_id(self, folder_id: str) -> pathlib.Path:
|
|
jason = self._get_folders()
|
|
the_folder = next(filter(lambda folder: folder["id"] == folder_id, jason))
|
|
return pathlib.Path(the_folder["path"])
|
|
# Ingest files on-demand using Syncthing:5 ends here
|
|
|
|
# [[file:../../../interfaces.org::*Ingest files on-demand using Syncthing][Ingest files on-demand using Syncthing:6]]
|
|
def get_one_event_batch(self, since: str|None, timeout: int = 120):
|
|
params = urllib.parse.urlencode(dict(
|
|
since=since,
|
|
timeout=timeout,
|
|
))
|
|
url = f"http://localhost:8384/rest/events/disk?{params}"
|
|
req = urllib.request.Request(
|
|
url,
|
|
headers = {"X-API-Key": self.key}
|
|
)
|
|
rsp = urllib.request.urlopen(req, None, timeout=timeout)
|
|
jason = json.load(rsp)
|
|
|
|
ingest = False
|
|
last_since = None
|
|
for event in jason:
|
|
last_since = event.get("id")
|
|
data = event.get("data", {})
|
|
event_folder_id = data.get("folder", "")
|
|
file_path = pathlib.Path(event.get("data", {}).get("path"))
|
|
# Ingest files on-demand using Syncthing:6 ends here
|
|
|
|
# [[file:../../../interfaces.org::*Ingest files on-demand using Syncthing][Ingest files on-demand using Syncthing:7]]
|
|
ingest_this = True
|
|
if file_path.name.startswith(".#"):
|
|
logger.debug("skip temp file")
|
|
ingest_this = False
|
|
elif file_path.suffix != ".org":
|
|
logger.debug(f"skip non-org file {file_path.suffix}")
|
|
ingest_this = False
|
|
elif event_folder_id != self.folder_id:
|
|
logger.debug(f"skip unmonitored folder {event_folder_id}")
|
|
ingest_this = False
|
|
# Ingest files on-demand using Syncthing:7 ends here
|
|
|
|
# [[file:../../../interfaces.org::*Ingest files on-demand using Syncthing][Ingest files on-demand using Syncthing:8]]
|
|
elif data.get("action") == "deleted":
|
|
final_path = self.expanded_path.joinpath(file_path)
|
|
try:
|
|
f = roam.models.File.objects.get(path=final_path)
|
|
logger.debug(f"deleting {final_path}!")
|
|
f.delete()
|
|
ingest_this = False
|
|
except roam.models.File.DoesNotExist:
|
|
pass
|
|
# Ingest files on-demand using Syncthing:8 ends here
|
|
|
|
# [[file:../../../interfaces.org::*Ingest files on-demand using Syncthing][Ingest files on-demand using Syncthing:9]]
|
|
# add new failure cases here.
|
|
logger.debug(f"proc {last_since} {event_folder_id}, ingest? {ingest}, ingest_this? {ingest_this}: {json.dumps(event)}")
|
|
if ingest_this == True:
|
|
ingest = ingest_this
|
|
logger.debug(f"{event}")
|
|
if ingest:
|
|
call_command('ingestfiles', self.expanded_path)
|
|
return last_since
|
|
# Ingest files on-demand using Syncthing:9 ends here
|