From 91a332ca539418290af5b60fbf31f618b6446b67 Mon Sep 17 00:00:00 2001 From: Neale Pickett Date: Thu, 25 Aug 2022 10:17:25 -0600 Subject: [PATCH] Refactor. CD maybe working? --- .vscode/settings.json | 1 + Dockerfile | 13 +- README.md | 7 ++ doc/architecture.md | 54 ++++++++ src/cd.py | 86 +++++++------ src/dvd.py | 277 ++++++++++++++++++++---------------------- src/encoder.py | 44 +++---- src/mediahandler.py | 6 + src/reader.py | 61 ++++------ src/state.py | 25 ---- src/statuser.py | 6 +- src/sucker.py | 10 +- src/worker.py | 25 ++++ 13 files changed, 329 insertions(+), 286 deletions(-) create mode 100644 doc/architecture.md create mode 100644 src/mediahandler.py delete mode 100644 src/state.py create mode 100644 src/worker.py diff --git a/.vscode/settings.json b/.vscode/settings.json index 000940a..8a392e7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -10,6 +10,7 @@ "gnudb", "newfn", "RDONLY", + "cdparanoia", "TTITLE" ] } \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index b5e232f..1ddb9ac 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,24 +7,17 @@ RUN true \ && sed -i 's/main$/main contrib non-free/' /etc/apt/sources.list \ && apt-get -y update \ && DEBIAN_FRONTEND=noninteractive apt-get --no-install-recommends -y install \ - ffmpeg \ - handbrake-cli libavcodec-extra \ - abcde eyed3 \ - glyrc setcd eject \ dvdbackup \ libdvd-pkg libdvdcss2 \ + handbrake-cli libavcodec-extra \ + cd-discid cdparanoia lame \ python3 \ - cowsay \ + python3-slugify \ && true RUN dpkg-reconfigure libdvd-pkg RUN true \ && DEBIAN_FRONTEND=noninteractive apt-get --no-install-recommends -y install \ - lame \ - busybox \ - jq \ - procps \ - moreutils \ cowsay COPY src/* /app/ diff --git a/README.md b/README.md index a4a5459..dc49b62 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,13 @@ At the time I'm writing this README, it will: * ~~Rip audio CDs, look them up in cddb, encode them to VBR MP3, then tag them.~~ A rewrite broke this; I plan to fix it soon. * Rip video DVDs, transcode them to mkv +## Requirements + +* HandBrakeCLI +* cdparanoia +* cd-discid +* + ## How To Run This You need a place to store your stuff. diff --git a/doc/architecture.md b/doc/architecture.md new file mode 100644 index 0000000..26ca313 --- /dev/null +++ b/doc/architecture.md @@ -0,0 +1,54 @@ +# Web Server + +There is one web server, +which provides static content, +and a single entrypoint for dynamic state information. + +The static content is some HTML and JavaScript, +which the browser runs to pull the dynamic state, +and update the page with current status of everything. + + +# Workers + +There are at least two Workers: +a Reader and an Encoder. +Each Worker runs in its own thread, +and can do its job without interfering with another Worker. + +## Readers + +Readers monitor a device for media. +Right now, those devices are always CD-ROM drives. +As soon as media is inserted, +a MediaHandler is created to scan and then copy it. + +## Encoders + +Encoders wait for jobs to show up, +and then they re-invoke a MediaHandler to encode everything in that job. + + +# MediaHandlers + +MediaHandlers have a work directory, +where they store all their stuff. +They have the following stages of execution: + +1. *scan* the media to figure out its title, list of tracks, and other metadata +2. *copy* the media to the work directory +3. *encode* the work directory into the desired format (eg. MP3, MKV) +4. *clean* the work directory + +Before each step, +state is read out of the work directory. + +During each step, +a MediaHandler continually updates its Worker with a completion percentage. +This is passed up to the Web Server's dynamic state. + +After each step, +a MediaHandler updates its state, +which is stored on disk. +The only way to communicate state between execution stages is by writing to disk. +This provides some tolerance of job interruption, power loss, etc. diff --git a/src/cd.py b/src/cd.py index 5158f27..d47c32c 100644 --- a/src/cd.py +++ b/src/cd.py @@ -13,7 +13,7 @@ SECOND = 1 MINUTE = 60 * SECOND HOUR = 60 * MINUTE -def read(device, status): +def scan(state, device): # Get disc ID p = subprocess.run( [ @@ -24,7 +24,7 @@ def read(device, status): capture_output=True, ) discid = p.stdout.strip() - status["discid"] = discid + state["discid"] = discid # Look it up in cddb email = os.environ.get("EMAIL") # You should really set this variable, tho @@ -42,23 +42,24 @@ def read(device, status): # We're expected to be automatic here, # so just use the first one. for k in ("title", "artist", "genre", "year", "tracks"): - status[k] = disc[k] + state[k] = disc[k] else: now = time.strftime("%Y-%m-%dT%H%M%S") num_tracks = int(discid.split()[1]) - status["title"] = "Unknown CD - %s" % now - status["tracks"] = ["Track %02d" % i for i in range(num_tracks)] + state["title"] = "Unknown CD - %s" % now + state["tracks"] = ["Track %02d" % i for i in range(num_tracks)] -def rip(device, status, directory): + +def copy(state, device, directory): # cdparanoia reports completion in samples # use discid duration to figure out total number of samples - duration = int(status["discid"].split()[-1]) * SECOND # disc duration in seconds + duration = int(state["discid"].split()[-1]) * SECOND # disc duration in seconds total_samples = duration * (75 / SECOND) * 1176 # 75 sectors per second, 1176 samples per sector - status["total_samples"] = total_samples + state["total_samples"] = total_samples track_num = 1 - for track_name in status["tracks"]: - logging.debug("Ripping track %d of %d", track_num, len(status["tracks"])) + for track_name in state["tracks"]: + logging.debug("Ripping track %d of %d", track_num, len(state["tracks"])) p = subprocess.Popen( [ "cdparanoia", @@ -76,16 +77,17 @@ def rip(device, status, directory): line = line.strip() if line.startswith("##: -2"): samples = int(line.split()[-1]) - status["complete"] = samples / total_samples + yield samples / total_samples track_num += 1 -def encode(status, directory): + +def encode(state, directory): track_num = 1 - durations = [int(d) for d in status["discid"].split()[2:-1]] + durations = [int(d) for d in state["discid"].split()[2:-1]] total_duration = sum(durations) encoded_duration = 0 - for track_name in status["tracks"]: + for track_name in state["tracks"]: logging.debug("Encoding track %d (%s)" % (track_num, track_name)) duration = durations[track_num-1] argv = [ @@ -94,15 +96,15 @@ def encode(status, directory): "--nohist", "--disptime", "1", "--preset", "standard", - "--tl", status["title"], - "--tn", "%d/%d" % (track_num, len(status["tracks"])), + "--tl", state["title"], + "--tn", "%d/%d" % (track_num, len(state["tracks"])), ] - if status.get("artist"): - argv.extend(["--ta", status["artist"]]) - if status.get("genre"): - argv.extend(["--tg", status["genre"]]) - if status.get("year"): - argv.extend(["--ty", status["year"]]) + if state.get("artist"): + argv.extend(["--ta", state["artist"]]) + if state.get("genre"): + argv.extend(["--tg", state["genre"]]) + if state.get("year"): + argv.extend(["--ty", state["year"]]) if track_name: argv.extend(["--tt", track_name]) outfn = "%02d - %s.mp3" % (track_num, track_name) @@ -122,34 +124,38 @@ def encode(status, directory): p = line.split("(")[1] p = p.split("%")[0] pct = int(p) / 100 - status["complete"] = (encoded_duration + (duration * pct)) / total_duration - print(status["complete"]) + yield (encoded_duration + (duration * pct)) / total_duration encoded_duration += duration track_num += 1 + +def clean(state, directory): + pass + + if __name__ == "__main__": import pprint import sys import json - if len(sys.argv) > 1: - directory = sys.argv[1] - fn = os.path.join(directory, "status.json") - f = open(fn) - status = json.load(f) - else: - logging.basicConfig(level=logging.DEBUG) - status = {} - read("/dev/sr0", status) - pprint.pprint(status) + logging.basicConfig(level=logging.DEBUG) - directory = os.path.join(".", status["title"]) - os.makedirs(directory, exist_ok=True) - rip("/dev/sr0", status, directory) - pprint.pprint(status) + state = {} + scan(state, "/dev/sr0") + pprint.pprint(state) - encode(status, directory) - pprint.pprint(status) + directory = os.path.join(".", state["title"]) + os.makedirs(directory, exist_ok=True) + with open(os.path.join(directory, "state.json"), "w") as f: + json.dump(f, state) + + for pct in copy(state, "/dev/sr0", directory): + sys.stdout.write("Copying: %3d%%\r" % (pct*100)) + pprint.pprint(state) + + for pct in encode(state, directory): + sys.stdout.write("Encoding: %3d%%\r" % (pct*100)) + pprint.pprint(state) # vi: sw=4 ts=4 et ai diff --git a/src/dvd.py b/src/dvd.py index df56d84..73d026c 100644 --- a/src/dvd.py +++ b/src/dvd.py @@ -10,164 +10,153 @@ SECOND = 1 MINUTE = 60 * SECOND HOUR = 60 * MINUTE -class Copier: - def __init__(self, device, status): - self.device = device - self.status = status - self.scan() - - def collect(self, track): - newCollection = [] - for t in self.collection: - if t["length"] == track["length"]: - # If the length is exactly the same, - # assume it's the same track, - # and pick the one with the most stuff. - if len(track["audio"]) < len(t["audio"]): - return - elif len(track["subp"]) < len(t["subp"]): - return - newCollection.append(t) - newCollection.append(track) - self.collection = newCollection - - def scan(self): - self.status["state"] = "scanning" - - self.collection = [] - p = subprocess.run( - [ - "lsdvd", - "-Oy", - "-x", - self.device, - ], - encoding="utf-8", - capture_output=True, - ) - lsdvd = eval(p.stdout[8:]) # s/^lsdvd = // - title = lsdvd["title"] - if title in ('No', 'unknown'): - title = lsdvd["provider_id"] - if title == "$PACKAGE_STRING": - title = "DVD" - now = time.strftime("%Y-%m-%dT%H%M%S") - title = "%s %s" % (title, now) - - # Go through all the tracks, looking for the largest referenced sector. - max_sector = 0 - max_length = 0 - tracks = lsdvd["track"] - for track in tracks: - max_length = max(track["length"], max_length) - for cell in track["cell"]: - max_sector = max(cell["last_sector"], max_sector) - if max_sector == 0: - logging.info("Media size = 0; aborting") - return - - # Make a guess about what's on this DVD. - # We will categories into three types: - # * A feature, which has one track much longer than any other - # * A collection of shows, which has several long tracks, more or less the same lengths - # * Something else - for track in tracks: - if track["length"] / max_length > 0.80: - self.collect(track) - if (max_length < 20 * MINUTE) and (len(self.collection) < len(track) * 0.6): - self.collection = tracks - - self.status["title"] = title - self.status["size"] = max_sector * 2048 # DVD sector size = 2048 - self.status["tracks"] = [(t["ix"], t["length"]) for t in self.collection] +def collect(collection, track): + newCollection = [] + for t in collection: + if t["length"] == track["length"]: + # If the length is exactly the same, + # assume it's the same track, + # and pick the one with the most stuff. + if len(track["audio"]) < len(t["audio"]): + return collection + elif len(track["subp"]) < len(t["subp"]): + return collection + newCollection.append(t) + newCollection.append(track) + return newCollection - def copy(self, directory): - self.status["state"] = "copying" +def scan(state, device): + p = subprocess.run( + [ + "lsdvd", + "-Oy", + "-x", + device, + ], + encoding="utf-8", + capture_output=True, + ) + lsdvd = eval(p.stdout[8:]) # s/^lsdvd = // + title = lsdvd["title"] + if title in ('No', 'unknown'): + title = lsdvd["provider_id"] + if title == "$PACKAGE_STRING": + title = "DVD" + now = time.strftime(r"%Y-%m-%dT%H%M%S") + title = "%s %s" % (title, now) + # Go through all the tracks, looking for the largest referenced sector. + max_sector = 0 + max_length = 0 + tracks = lsdvd["track"] + for track in tracks: + max_length = max(track["length"], max_length) + for cell in track["cell"]: + max_sector = max(cell["last_sector"], max_sector) + if max_sector == 0: + logging.info("Media size = 0; aborting") + return + + # Make a guess about what's on this DVD. + # We will categories into three types: + # * A feature, which has one track much longer than any other + # * A collection of shows, which has several long tracks, more or less the same lengths + # * Something else + collection = [] + for track in tracks: + if track["length"] / max_length > 0.80: + collection = collect(track) + if (max_length < 20 * MINUTE) and (len(collection) < len(track) * 0.6): + collection = tracks + + state["title"] = title + state["size"] = max_sector * 2048 # DVD sector size = 2048 + state["tracks"] = [(t["ix"], t["length"]) for t in collection] + +def copy(state, device, directory): + p = subprocess.Popen( + [ + "dvdbackup", + "--input=" + device, + "--name=" + state["title"], + "--mirror", + "--progress", + ], + encoding="utf-8", + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=directory, + ) + totalBytes = titleSize = lastTitleSize = 0 + progressRe = re.compile(r"^Copying.*([0-9.]+)/[0-9.]+ (MiB|KiB)") + for line in p.stdout: + line = line.strip() + m = progressRe.search(line) + if m and m[2] == "MiB": + titleSize = float(m[1]) * 1024 * 1024 + elif m and m[2] == "KiB": + titleSize = float(m[1]) * 1024 + if titleSize < lastTitleSize: + totalBytes += lastTitleSize + lastTitleSize = titleSize + yield (totalBytes + titleSize) / state["size"] + + +def encode(state, directory): + title = state["title"] + logging.info("encoding: %s (%s)" % (title, directory)) + + total_length = sum(t[1] for t in state["tracks"]) + finished_length = 0 + for track, length in state["tracks"]: + outfn = "%s-%d.mkv" % (title, track) + tmppath = os.path.join(directory, outfn) + outpath = os.path.join(directory, "..", outfn) p = subprocess.Popen( [ - "dvdbackup", - "--input=" + self.device, - "--name=" + self.status["title"], - "--mirror", - "--progress", + "nice", + "HandBrakeCLI", + "--json", + "--input", "%s/VIDEO_TS" % directory, + "--output", tmppath, + "--title", str(track), + "--native-language", "eng", + "--markers", + "--loose-anamorphic", + "--all-subtitles", + "--all-audio", + "--aencoder", "copy", + "--audio-copy-mask", "aac,ac3,mp3", + "--audio-fallback", "aac", ], encoding="utf-8", stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - cwd=directory, + stderr=None, ) - totalBytes = titleSize = lastTitleSize = 0 - progressRe = re.compile(r"^Copying.*([0-9.]+)/[0-9.]+ (MiB|KiB)") + + # HandBrakeCLI spits out sort of JSON. + # But Python has no built-in way to stream JSON objects. + # Hence this kludge. + progressRe = re.compile(r'^"Progress": ([0-9.]+),') for line in p.stdout: line = line.strip() m = progressRe.search(line) - if m and m[2] == "MiB": - titleSize = float(m[1]) * 1024 * 1024 - elif m and m[2] == "KiB": - titleSize = float(m[1]) * 1024 - if titleSize < lastTitleSize: - totalBytes += lastTitleSize - lastTitleSize = titleSize - self.status["complete"] = (totalBytes + titleSize) / self.status["size"] + if m: + progress = float(m[1]) + complete = (finished_length + progress*length) / total_length + state["complete"] = complete + + finished_length += length + os.rename( + src=tmppath, + dst=outpath, + ) + logging.info("Finished track %d; length %d" % (track, length)) -class Encoder: - def __init__(self, basedir, status): - self.basedir = basedir - self.status = status - - def encode(self, obj): - title = obj["title"] - logging.info("encoding: %s (%s)" % (title, self.basedir)) - - total_length = sum(t[1] for t in obj["tracks"]) - finished_length = 0 - for track, length in obj["tracks"]: - outfn = "%s-%d.mkv" % (title, track) - tmppath = os.path.join(self.basedir, outfn) - outpath = os.path.join(self.basedir, "..", outfn) - p = subprocess.Popen( - [ - "nice", - "HandBrakeCLI", - "--json", - "--input", "%s/VIDEO_TS" % self.basedir, - "--output", tmppath, - "--title", str(track), - "--native-language", "eng", - "--markers", - "--loose-anamorphic", - "--all-subtitles", - "--all-audio", - "--aencoder", "copy", - "--audio-copy-mask", "aac,ac3,mp3", - "--audio-fallback", "aac", - ], - encoding="utf-8", - stdout=subprocess.PIPE, - stderr=None, - ) - - # HandBrakeCLI spits out sort of JSON. - # But Python has no built-in way to stream JSON objects. - # Hence this kludge. - progressRe = re.compile(r'^"Progress": ([0-9.]+),') - for line in p.stdout: - line = line.strip() - m = progressRe.search(line) - if m: - progress = float(m[1]) - complete = (finished_length + progress*length) / total_length - self.status["complete"] = complete - - finished_length += length - os.rename( - src=tmppath, - dst=outpath, - ) - logging.info("Finished track %d; length %d" % (track, length)) +def clean(state, directory): + pass if __name__ == "__main__": diff --git a/src/encoder.py b/src/encoder.py index 9a6b44b..dcfdfb5 100644 --- a/src/encoder.py +++ b/src/encoder.py @@ -1,7 +1,6 @@ #! /usr/bin/python3 import os -import threading import subprocess import glob import os @@ -13,40 +12,41 @@ import re import logging import dvd import cd +import worker -class Encoder(threading.Thread): - def __init__(self, directory=None, **kwargs): +class Encoder(worker.Worker): + def __init__(self, directory=None): self.status = {} self.directory = directory - return super().__init__(**kwargs) + return super().__init__(directory) def run(self): while True: wait = True self.status = {"type": "encoder", "state": "idle"} - for fn in glob.glob(os.path.join(self.directory, "*", "sucker.json")): - fdir = os.path.dirname(fn) - with open(fn) as f: - obj = json.load(f) - self.encode(fdir, obj) + for fn in glob.glob(self.workdir("*", "state.json")): + self.encode(os.path.dirname(fn), obj) wait = False if wait: time.sleep(12) - def encode(self, fdir, obj): + def encode(self, directory, obj): self.status["state"] = "encoding" - self.status["title"] = obj["title"] - if obj["type"] == "audio": - self.encode_audio(fdir, obj) - else: - self.encode_video(fdir, obj) - shutil.rmtree(fdir) - - def encode_audio(self, fdir, obj): - cd.encode(obj, fdir) - def encode_video(self, fdir, obj): - enc = dvd.Encoder(fdir, self.status) - enc.encode(obj) + state = self.read_state(directory) + self.status["title"] = state["title"] + + if state["video"]: + media = dvd + else: + media = cd + + logging.info("Encoding %s (%s)" % (directory, state["title"])) + for pct in media.encode(state, directory): + self.status["complete"] = pct + + media.clean(state, directory) + + logging.info("Finished encoding") # vi: sw=4 ts=4 et ai diff --git a/src/mediahandler.py b/src/mediahandler.py new file mode 100644 index 0000000..eba4a60 --- /dev/null +++ b/src/mediahandler.py @@ -0,0 +1,6 @@ +class MediaHandler: + def __init__(self, basedir, state): + self.basedir = basedir + self.state = state + + def \ No newline at end of file diff --git a/src/reader.py b/src/reader.py index 8e2bcd4..1aa7782 100644 --- a/src/reader.py +++ b/src/reader.py @@ -1,7 +1,6 @@ #! /usr/bin/python3 import os -import threading import subprocess import time import re @@ -9,8 +8,10 @@ import fcntl import traceback import json import logging +import slugify import dvd import cd +import worker CDROM_DRIVE_STATUS = 0x5326 CDS_NO_INFO = 0 @@ -28,25 +29,21 @@ CDS_DATA_2 = 102 CDROM_LOCKDOOR = 0x5329 CDROM_EJECT = 0x5309 -class Reader(threading.Thread): - def __init__(self, device, directory=None, **kwargs): +class Reader(worker.Worker): + def __init__(self, device, directory): + super().__init__(device) self.device = device - self.directory = directory - self.status = { - "type": "reader", - "state": "idle", - "device": self.device, - } + self.status["type"] = "reader" + self.status["device"] = device self.complete = 0 self.staleness = 0 self.drive = None logging.info("Starting reader on %s" % self.device) - return super().__init__(**kwargs) def reopen(self): if (self.staleness > 15) or not self.drive: if self.drive: - self.drive.close() + os.close(self.drive) self.drive = None try: self.drive = os.open(self.device, os.O_RDONLY | os.O_NONBLOCK) @@ -69,9 +66,9 @@ class Reader(threading.Thread): rv = fcntl.ioctl(self.drive, CDROM_DISC_STATUS) try: if rv == CDS_AUDIO: - self.handle_audio() + self.handle(false) elif rv in [CDS_DATA_1, CDS_DATA_2]: - self.handle_data() + self.handle(true) else: logging.info("Can't handle disc type %d" % rv) except Exception as e: @@ -96,32 +93,26 @@ class Reader(threading.Thread): logging.error("Ejecting: %v" % e) time.sleep(i * 5) - # XXX: rename this to something like "write_status" - def finished(self, **kwargs): - self.status["state"] = "finished read" - fn = os.path.join(self.directory, self.status["title"], "sucker.json") - newfn = fn + ".new" - with open(newfn, "w") as fout: - json.dump(obj=self.status, fp=fout) - os.rename(src=newfn, dst=fn) - - def handle_audio(self): - self.status["video"] = False - + def handle(self, video): + self.status["video"] = video self.status["state"] = "reading" - cd.read(self.device, self.status) + + state = {} + state["video"] = video + if video: + media = cd + else: + media = dvd + + media.scan(state, self.device) + self.status["title"] = state["title"] + subdir = slugify.slugify(state["title"]) - directory = os.path.join(self.directory, status["title"]) - os.makedirs(directory, exist_ok=True) self.status["state"] = "copying" - cd.copy(self.device, self.status, self.directory) - self.finished() # XXX: rename this to something like "write_status" + for pct in media.copy(device, self.workdir(subdir)): + self.status["complete"] = pct + self.write_state(subdir, state) - def handle_data(self): - self.status["video"] = True - src = dvd.Copier(self.device, self.status) - src.copy(self.directory) - self.finished() # vi: sw=4 ts=4 et ai diff --git a/src/state.py b/src/state.py deleted file mode 100644 index 66b5b1a..0000000 --- a/src/state.py +++ /dev/null @@ -1,25 +0,0 @@ -#! /usr/bin/python3 - -import json - -class State(dict): - def __init__(self, path): - super().__init__() - self.path = path - self.read() - - def read(self): - try: - f = open(self.path) - except FileNotFoundError: - return - obj = json.load(f) - f.close() - - for k in obj: - self[k] = obj[k] - - def write(self): - f = open(self.path, "w") - json.dump(self, f) - f.close() diff --git a/src/statuser.py b/src/statuser.py index 66f9dcb..9c8daf9 100644 --- a/src/statuser.py +++ b/src/statuser.py @@ -7,17 +7,17 @@ import time import os class Statuser(threading.Thread): - def __init__(self, workers, directory=None, **kwargs): + def __init__(self, workers, directory): self.workers = workers self.directory = directory self.status = {} - super().__init__(**kwargs) + super().__init__(daemon=True) def run(self): while True: self.status["finished"] = { "video": glob.glob(os.path.join(self.directory, "*.mkv")), - "audio": glob.glob(os.path.join(self.directory, "*/*/*.mp3")), + "audio": glob.glob(os.path.join(self.directory, "*/*.mp3")), } self.status["workers"] = [w.status for w in self.workers] time.sleep(12) diff --git a/src/sucker.py b/src/sucker.py index 0c91b74..ab01fb1 100644 --- a/src/sucker.py +++ b/src/sucker.py @@ -33,13 +33,9 @@ def main(): logging.basicConfig(level=logging.INFO) - readers = [] - for d in args.drive: - readers.append(reader.Reader(d, directory=args.incoming, daemon=True)) - encoders = [] - for i in range(1): - encoders.append(encoder.Encoder(directory=args.incoming, daemon=True)) - st = statuser.Statuser(readers + encoders, directory=args.incoming, daemon=True) + readers = [reader.Reader(d, args.incoming) for d in args.drive] + encoders = [encoder.Encoder(args.incoming) for i in range(1)] + st = statuser.Statuser(readers + encoders, args.incoming) [w.start() for w in readers + encoders] st.start() diff --git a/src/worker.py b/src/worker.py new file mode 100644 index 0000000..26df748 --- /dev/null +++ b/src/worker.py @@ -0,0 +1,25 @@ +import threading +import os +import json + +class Worker(threading.Thread): + def __init__(self, directory, **kwargs): + self.directory = directory + self.status = { + "state": "idle", + "directory": directory, + } + + kwargs["daemon"] = True + return super().__init__(**kwargs) + + def workdir(self, *path): + return os.path.join(self.directory, *path) + + def write_state(self, subdir, state): + with open(self.workdir(subdir, "state.json"), "w") as f: + json.dump(f, state) + + def read_state(self, subdir): + with open(self.workdir(subdir, "state.json")) as f: + return json.load(f)