media-sucker/src/cd.py

156 lines
4.5 KiB
Python
Raw Normal View History

2022-08-23 22:06:44 -06:00
#! /usr/bin/python3
import subprocess
import time
import logging
import re
import os
import socket
import io
import gnudb
SECOND = 1
MINUTE = 60 * SECOND
HOUR = 60 * MINUTE
def read(device, status):
# Get disc ID
p = subprocess.run(
[
"cd-discid",
device,
],
encoding="utf-8",
capture_output=True,
)
2022-08-25 10:17:25 -06:00
discid = p.stdout.strip()
2022-08-23 22:06:44 -06:00
status["discid"] = discid
# Look it up in cddb
email = os.environ.get("EMAIL") # You should really set this variable, tho
if not email:
user = "user"
try:
user = os.getlogin()
except OSError:
pass
email = "%s@%s" % (user, socket.gethostname())
db_server = gnudb.Server(email)
disc = db_server.bestguess(discid)
if disc:
# There was a hit! Hooray!
# We're expected to be automatic here,
# so just use the first one.
for k in ("title", "artist", "genre", "year", "tracks"):
status[k] = disc[k]
else:
now = time.strftime("%Y-%m-%dT%H%M%S")
num_tracks = int(discid.split()[1])
status["title"] = "Unknown CD - %s" % now
2022-08-25 10:17:25 -06:00
status["tracks"] = ["Track %02d" % i for i in range(num_tracks)]
2022-08-23 22:06:44 -06:00
def rip(device, status, directory):
# cdparanoia reports completion in samples
# use discid duration to figure out total number of samples
duration = int(status["discid"].split()[-1]) * SECOND # disc duration in seconds
total_samples = duration * (75 / SECOND) * 1176 # 75 sectors per second, 1176 samples per sector
2022-08-25 10:17:25 -06:00
status["total_samples"] = total_samples
2022-08-23 22:06:44 -06:00
track_num = 1
for track_name in status["tracks"]:
logging.debug("Ripping track %d of %d", track_num, len(status["tracks"]))
p = subprocess.Popen(
[
"cdparanoia",
"--stderr-progress",
"--force-cdrom-device", device,
"--batch",
str(track_num),
],
cwd = directory,
stderr = subprocess.PIPE,
encoding = "utf-8",
)
for line in p.stderr:
line = line.strip()
if line.startswith("##: -2"):
samples = int(line.split()[-1])
status["complete"] = samples / total_samples
track_num += 1
def encode(status, directory):
track_num = 1
2022-08-25 10:17:25 -06:00
durations = [int(d) for d in status["discid"].split()[2:-1]]
total_duration = sum(durations)
encoded_duration = 0
2022-08-23 22:06:44 -06:00
for track_name in status["tracks"]:
2022-08-25 10:17:25 -06:00
logging.debug("Encoding track %d (%s)" % (track_num, track_name))
duration = durations[track_num-1]
2022-08-23 22:06:44 -06:00
argv = [
"lame",
2022-08-25 10:17:25 -06:00
"--brief",
"--nohist",
"--disptime", "1",
2022-08-23 22:06:44 -06:00
"--preset", "standard",
2022-08-25 10:17:25 -06:00
"--tl", status["title"],
2022-08-23 22:06:44 -06:00
"--tn", "%d/%d" % (track_num, len(status["tracks"])),
]
2022-08-25 10:17:25 -06:00
if status.get("artist"):
argv.extend(["--ta", status["artist"]])
if status.get("genre"):
argv.extend(["--tg", status["genre"]])
if status.get("year"):
argv.extend(["--ty", status["year"]])
2022-08-23 22:06:44 -06:00
if track_name:
2022-08-25 10:17:25 -06:00
argv.extend(["--tt", track_name])
outfn = "%02d - %s.mp3" % (track_num, track_name)
2022-08-23 22:06:44 -06:00
else:
2022-08-25 10:17:25 -06:00
outfn = "%02d.mp3" % track_num
2022-08-23 22:06:44 -06:00
argv.append("track%02d.cdda.wav" % track_num)
argv.append(outfn)
p = subprocess.Popen(
argv,
cwd = directory,
2022-08-25 10:17:25 -06:00
stderr = subprocess.PIPE,
2022-08-23 22:06:44 -06:00
encoding = "utf-8",
)
2022-08-25 10:17:25 -06:00
for line in p.stderr:
line = line.strip()
if "%)" in line:
p = line.split("(")[1]
p = p.split("%")[0]
pct = int(p) / 100
status["complete"] = (encoded_duration + (duration * pct)) / total_duration
print(status["complete"])
encoded_duration += duration
2022-08-23 22:06:44 -06:00
track_num += 1
if __name__ == "__main__":
import pprint
2022-08-25 10:17:25 -06:00
import sys
import json
2022-08-23 22:06:44 -06:00
2022-08-25 10:17:25 -06:00
if len(sys.argv) > 1:
directory = sys.argv[1]
fn = os.path.join(directory, "status.json")
f = open(fn)
status = json.load(f)
else:
logging.basicConfig(level=logging.DEBUG)
status = {}
read("/dev/sr0", status)
pprint.pprint(status)
2022-08-23 22:06:44 -06:00
2022-08-25 10:17:25 -06:00
directory = os.path.join(".", status["title"])
os.makedirs(directory, exist_ok=True)
rip("/dev/sr0", status, directory)
pprint.pprint(status)
2022-08-23 22:06:44 -06:00
encode(status, directory)
pprint.pprint(status)
# vi: sw=4 ts=4 et ai