tags: python media ffmpeg vibe Another exercise in use Gemini. # First Version ```py #!/usr/bin/env python3 prompt = r""" A python wrapper script for ffprobe. For example: get_video_info -f bitrate,vbitrate,abitrate,duration,title a.mp4 b.mp4 writing output to the console, optionally outputting json or csv. (--json or --csv) -f is short for --fields. """ prompt = r""" could you include querying for resolution, frame rate, metadata like artist, album, etc (so it can be used for mp3 and m4a files too) """ prompt = r""" could the output be a table with the filename first, and headers """ prompt = r""" When scanning, could it print out the current filename, then use \r to overwrite the line, until done, then clear the line and print the output. Also when scanning, include a counter e.g. 4/15 showing progress. """ prompt = r""" for things like bitrate, an option to enable human readable (pick a letter for the short version, but not -h since that's help) """ import subprocess import json import csv import sys import argparse import os def get_probe_data(file_path): cmd = [ 'ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_format', '-show_streams', file_path ] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) return json.loads(result.stdout) except Exception: return None def format_human_size(value, suffix='B', is_bitrate=False): """Converts bytes or bits to human readable strings (KB, MB, GB / kbps, Mbps).""" try: num = float(value) # Bitrates are usually base-10 (1000), file sizes base-2 (1024) base = 1000.0 if is_bitrate else 1024.0 units = ['', 'K', 'M', 'G', 'T'] if is_bitrate: suffix = 'bps' for unit in units: if abs(num) < base: return f"{num:.2f} {unit}{suffix}".strip() num /= base return f"{num:.2f} P{suffix}" except (ValueError, TypeError): return "N/A" def format_duration(seconds): try: sec = float(seconds) mins, sec = divmod(int(sec), 60) hrs, mins = divmod(mins, 60) return f"{hrs:02d}:{mins:02d}:{sec:02d}" if hrs > 0 else f"{mins:02d}:{sec:02d}" except (ValueError, TypeError): return "N/A" def extract_fields(data, requested_fields, human_readable=False): if not data: return None fmt = data.get('format', {}) streams = data.get('streams', []) tags = fmt.get('tags', {}) v_stream = next((s for s in streams if s['codec_type'] == 'video'), {}) a_stream = next((s for s in streams if s['codec_type'] == 'audio'), {}) fr_raw = v_stream.get('avg_frame_rate', '0/0') try: num, den = map(int, fr_raw.split('/')) fps = round(num / den, 2) if den != 0 else "N/A" except (ValueError, ZeroDivisionError): fps = "N/A" # Raw Data Mapping raw_vals = { 'bitrate': fmt.get('bit_rate'), 'vbitrate': v_stream.get('bit_rate'), 'abitrate': a_stream.get('bit_rate'), 'size': fmt.get('size'), 'duration': fmt.get('duration'), 'width': v_stream.get('width', 'N/A'), 'height': v_stream.get('height', 'N/A'), 'fps': fps, 'title': tags.get('title') or tags.get('TITLE', 'N/A'), 'artist': tags.get('artist') or tags.get('ARTIST', 'N/A'), 'album': tags.get('album') or tags.get('ALBUM', 'N/A'), } # Final logic processing res = {} for f in requested_fields: val = raw_vals.get(f, "N/A") if human_readable and val != "N/A": if f in ['bitrate', 'vbitrate', 'abitrate']: val = format_human_size(val, is_bitrate=True) elif f == 'size': val = format_human_size(val) if f == 'duration': val = format_duration(val) elif f == 'resolution': val = f"{raw_vals['width']}x{raw_vals['height']}" if raw_vals['width'] != 'N/A' else "N/A" res[f] = str(val) return res def print_table(data, headers): if not data: return widths = {h: len(h) for h in headers} for row in data: for h in headers: widths[h] = max(widths[h], len(str(row[h]))) header_line = " ".join(h.upper().ljust(widths[h]) for h in headers) print(header_line) print("-" * len(header_line)) for row in data: print(" ".join(str(row[h]).ljust(widths[h]) for h in headers)) def main(): parser = argparse.ArgumentParser(description="FFprobe Wrapper") parser.add_argument('files', nargs='+') parser.add_argument('-f', '--fields', default="duration,size,bitrate,resolution") parser.add_argument('-H', '--human', action='store_true', help="Human readable sizes/bitrates") group = parser.add_mutually_exclusive_group() group.add_argument('--json', action='store_true') group.add_argument('--csv', action='store_true') args = parser.parse_args() requested_fields = args.fields.split(',') all_headers = ['filename'] + requested_fields results = [] for i, file_path in enumerate(args.files, 1): base_name = os.path.basename(file_path) display_name = (base_name[:30] + '..') if len(base_name) > 32 else base_name sys.stderr.write(f"\rScanning [{i}/{len(args.files)}]: {display_name}".ljust(60)) sys.stderr.flush() raw_data = get_probe_data(file_path) if raw_data: info = extract_fields(raw_data, requested_fields, args.human) info['filename'] = base_name results.append(info) sys.stderr.write("\r" + " " * 70 + "\r") sys.stderr.flush() if args.json: print(json.dumps(results, indent=2)) elif args.csv: writer = csv.DictWriter(sys.stdout, fieldnames=all_headers) writer.writeheader() writer.writerows(results) else: print_table(results, all_headers) if __name__ == "__main__": main() ``` # Asking For It Packaged In Classes ```py #!/usr/bin/env python3 # following from vinf prompt = r""" Could you organise this as a class which can be extended by subclassing. So it handles stuff like reading and parsing the json into a data structure, and for good measure including what's available via stat(), so that I can write methods to parse and return stuff in a subclass. Then a front end class which takes a backend class and handles command line and output stuff. """ import subprocess import json import csv import sys import argparse import os from datetime import datetime # --- BACKEND LAYER --- class FFProbeEngine: """Handles the raw data acquisition from ffprobe and system stat.""" def __init__(self, file_path): self.file_path = file_path self.raw_data = self._probe() self.stats = self._get_stats() def _probe(self): """Runs ffprobe and returns the parsed JSON.""" cmd = [ 'ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_format', '-show_streams', self.file_path ] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) return json.loads(result.stdout) except Exception: return None def _get_stats(self): """Returns standard OS file statistics.""" try: return os.stat(self.file_path) except OSError: return None def get_metadata(self): """ Base method to return a flat dict of fields. Meant to be overridden or extended by subclasses. """ if not self.raw_data: return {} fmt = self.raw_data.get('format', {}) tags = fmt.get('tags', {}) # Default basic extraction return { 'filename': os.path.basename(self.file_path), 'size': int(fmt.get('size', 0)), 'duration': fmt.get('duration'), 'bitrate': fmt.get('bit_rate'), 'title': tags.get('title') or tags.get('TITLE', 'N/A'), 'mtime': datetime.fromtimestamp(self.stats.st_mtime).isoformat() if self.stats else "N/A" } class MediaAnalyzer(FFProbeEngine): """An extended engine that adds advanced parsing for video/audio streams.""" def get_metadata(self): data = super().get_metadata() if not self.raw_data: return data streams = self.raw_data.get('streams', []) v_stream = next((s for s in streams if s['codec_type'] == 'video'), {}) a_stream = next((s for s in streams if s['codec_type'] == 'audio'), {}) # Add advanced fields data.update({ 'vbitrate': v_stream.get('bit_rate', 'N/A'), 'abitrate': a_stream.get('bit_rate', 'N/A'), 'width': v_stream.get('width', 'N/A'), 'height': v_stream.get('height', 'N/A'), 'fps': self._parse_fps(v_stream.get('avg_frame_rate')), 'artist': self._get_tag('artist'), 'album': self._get_tag('album'), 'resolution': f"{v_stream.get('width')}x{v_stream.get('height')}" if v_stream.get('width') else "N/A" }) return data def _parse_fps(self, fps_str): try: num, den = map(int, fps_str.split('/')) return round(num / den, 2) if den != 0 else "N/A" except (ValueError, TypeError, ZeroDivisionError): return "N/A" def _get_tag(self, key): tags = self.raw_data.get('format', {}).get('tags', {}) return tags.get(key) or tags.get(key.upper(), 'N/A') # --- FRONTEND LAYER --- class MediaCLI: """Handles CLI arguments, progress reporting, and formatting output.""" def __init__(self, engine_class=MediaAnalyzer): self.engine_class = engine_class self.args = self._parse_args() def _parse_args(self): parser = argparse.ArgumentParser(description="Extensible FFprobe Wrapper") parser.add_argument('files', nargs='+') parser.add_argument('-f', '--fields', default="duration,size,bitrate,resolution") parser.add_argument('-H', '--human', action='store_true', help="Human readable output") group = parser.add_mutually_exclusive_group() group.add_argument('--json', action='store_true') group.add_argument('--csv', action='store_true') return parser.parse_args() def _format_human(self, key, value): """Helper to format bits/bytes.""" try: num = float(value) is_bitrate = 'bitrate' in key base = 1000.0 if is_bitrate else 1024.0 suffix = 'bps' if is_bitrate else 'B' for unit in ['', 'K', 'M', 'G', 'T']: if abs(num) < base: return f"{num:.2f} {unit}{suffix}".strip() num /= base return f"{num:.2f} P{suffix}" except (ValueError, TypeError): return value def _format_duration(self, seconds): try: sec = float(seconds) mins, sec = divmod(int(sec), 60) hrs, mins = divmod(mins, 60) return f"{hrs:02d}:{mins:02d}:{sec:02d}" if hrs > 0 else f"{mins:02d}:{sec:02d}" except (ValueError, TypeError): return "N/A" def run(self): requested_fields = self.args.fields.split(',') results = [] for i, path in enumerate(self.args.files, 1): # Progress update fname = os.path.basename(path) sys.stderr.write(f"\rScanning [{i}/{len(self.args.files)}]: {fname[:30]}".ljust(60)) sys.stderr.flush() # Process data using the provided engine engine = self.engine_class(path) metadata = engine.get_metadata() if metadata: # Apply transformations (Human readable / Duration) row = {'filename': metadata['filename']} for field in requested_fields: val = metadata.get(field, "N/A") if self.args.human and field in ['size', 'bitrate', 'vbitrate', 'abitrate']: val = self._format_human(field, val) if field == 'duration': val = self._format_duration(val) row[field] = str(val) results.append(row) sys.stderr.write("\r" + " " * 70 + "\r") sys.stderr.flush() self._output(results, ['filename'] + requested_fields) def _output(self, data, headers): if self.args.json: print(json.dumps(data, indent=2)) elif self.args.csv: writer = csv.DictWriter(sys.stdout, fieldnames=headers) writer.writeheader() writer.writerows(data) else: self._print_table(data, headers) def _print_table(self, data, headers): if not data: return widths = {h: max(len(h), max((len(str(row[h])) for row in data), default=0)) for h in headers} header_line = " ".join(h.upper().ljust(widths[h]) for h in headers) print(header_line) print("-" * len(header_line)) for row in data: print(" ".join(str(row[h]).ljust(widths[h]) for h in headers)) if __name__ == "__main__": cli = MediaCLI(MediaAnalyzer) cli.run() ```