Another exercise in use Gemini.
First Version
#!/usr/bin/env python3
prompt = r"""
A python wrapper script for ffprobe. For example:
get_video_info -f bitrate,vbitrate,abitrate,duration,title a.mp4 b.mp4
writing output to the console, optionally outputting json or csv. (--json or --csv)
-f is short for --fields.
"""
prompt = r"""
could you include querying for resolution, frame rate, metadata like artist,
album, etc (so it can be used for mp3 and m4a files too)
"""
prompt = r"""
could the output be a table with the filename first, and headers
"""
prompt = r"""
When scanning, could it print out the current filename, then use \r to
overwrite the line, until done, then clear the line and print the output. Also
when scanning, include a counter e.g. 4/15 showing progress.
"""
prompt = r"""
for things like bitrate, an option to enable human readable (pick a letter for
the short version, but not -h since that's help)
"""
import subprocess
import json
import csv
import sys
import argparse
import os
def get_probe_data(file_path):
cmd = [
'ffprobe', '-v', 'quiet', '-print_format', 'json',
'-show_format', '-show_streams', file_path
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return json.loads(result.stdout)
except Exception:
return None
def format_human_size(value, suffix='B', is_bitrate=False):
"""Converts bytes or bits to human readable strings (KB, MB, GB / kbps, Mbps)."""
try:
num = float(value)
# Bitrates are usually base-10 (1000), file sizes base-2 (1024)
base = 1000.0 if is_bitrate else 1024.0
units = ['', 'K', 'M', 'G', 'T']
if is_bitrate:
suffix = 'bps'
for unit in units:
if abs(num) < base:
return f"{num:.2f} {unit}{suffix}".strip()
num /= base
return f"{num:.2f} P{suffix}"
except (ValueError, TypeError):
return "N/A"
def format_duration(seconds):
try:
sec = float(seconds)
mins, sec = divmod(int(sec), 60)
hrs, mins = divmod(mins, 60)
return f"{hrs:02d}:{mins:02d}:{sec:02d}" if hrs > 0 else f"{mins:02d}:{sec:02d}"
except (ValueError, TypeError):
return "N/A"
def extract_fields(data, requested_fields, human_readable=False):
if not data: return None
fmt = data.get('format', {})
streams = data.get('streams', [])
tags = fmt.get('tags', {})
v_stream = next((s for s in streams if s['codec_type'] == 'video'), {})
a_stream = next((s for s in streams if s['codec_type'] == 'audio'), {})
fr_raw = v_stream.get('avg_frame_rate', '0/0')
try:
num, den = map(int, fr_raw.split('/'))
fps = round(num / den, 2) if den != 0 else "N/A"
except (ValueError, ZeroDivisionError):
fps = "N/A"
# Raw Data Mapping
raw_vals = {
'bitrate': fmt.get('bit_rate'),
'vbitrate': v_stream.get('bit_rate'),
'abitrate': a_stream.get('bit_rate'),
'size': fmt.get('size'),
'duration': fmt.get('duration'),
'width': v_stream.get('width', 'N/A'),
'height': v_stream.get('height', 'N/A'),
'fps': fps,
'title': tags.get('title') or tags.get('TITLE', 'N/A'),
'artist': tags.get('artist') or tags.get('ARTIST', 'N/A'),
'album': tags.get('album') or tags.get('ALBUM', 'N/A'),
}
# Final logic processing
res = {}
for f in requested_fields:
val = raw_vals.get(f, "N/A")
if human_readable and val != "N/A":
if f in ['bitrate', 'vbitrate', 'abitrate']:
val = format_human_size(val, is_bitrate=True)
elif f == 'size':
val = format_human_size(val)
if f == 'duration':
val = format_duration(val)
elif f == 'resolution':
val = f"{raw_vals['width']}x{raw_vals['height']}" if raw_vals['width'] != 'N/A' else "N/A"
res[f] = str(val)
return res
def print_table(data, headers):
if not data: return
widths = {h: len(h) for h in headers}
for row in data:
for h in headers:
widths[h] = max(widths[h], len(str(row[h])))
header_line = " ".join(h.upper().ljust(widths[h]) for h in headers)
print(header_line)
print("-" * len(header_line))
for row in data:
print(" ".join(str(row[h]).ljust(widths[h]) for h in headers))
def main():
parser = argparse.ArgumentParser(description="FFprobe Wrapper")
parser.add_argument('files', nargs='+')
parser.add_argument('-f', '--fields', default="duration,size,bitrate,resolution")
parser.add_argument('-H', '--human', action='store_true', help="Human readable sizes/bitrates")
group = parser.add_mutually_exclusive_group()
group.add_argument('--json', action='store_true')
group.add_argument('--csv', action='store_true')
args = parser.parse_args()
requested_fields = args.fields.split(',')
all_headers = ['filename'] + requested_fields
results = []
for i, file_path in enumerate(args.files, 1):
base_name = os.path.basename(file_path)
display_name = (base_name[:30] + '..') if len(base_name) > 32 else base_name
sys.stderr.write(f"\rScanning [{i}/{len(args.files)}]: {display_name}".ljust(60))
sys.stderr.flush()
raw_data = get_probe_data(file_path)
if raw_data:
info = extract_fields(raw_data, requested_fields, args.human)
info['filename'] = base_name
results.append(info)
sys.stderr.write("\r" + " " * 70 + "\r")
sys.stderr.flush()
if args.json:
print(json.dumps(results, indent=2))
elif args.csv:
writer = csv.DictWriter(sys.stdout, fieldnames=all_headers)
writer.writeheader()
writer.writerows(results)
else:
print_table(results, all_headers)
if __name__ == "__main__":
main()
Asking For It Packaged In Classes
#!/usr/bin/env python3
# following from vinf
prompt = r"""
Could you organise this as a class which can be extended by subclassing. So it handles stuff like reading and parsing the json into a data structure, and for good measure including what's available via stat(), so that I can write methods to parse and return stuff in a subclass.
Then a front end class which takes a backend class and handles command line and output stuff.
"""
import subprocess
import json
import csv
import sys
import argparse
import os
from datetime import datetime
# --- BACKEND LAYER ---
class FFProbeEngine:
"""Handles the raw data acquisition from ffprobe and system stat."""
def __init__(self, file_path):
self.file_path = file_path
self.raw_data = self._probe()
self.stats = self._get_stats()
def _probe(self):
"""Runs ffprobe and returns the parsed JSON."""
cmd = [
'ffprobe', '-v', 'quiet', '-print_format', 'json',
'-show_format', '-show_streams', self.file_path
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return json.loads(result.stdout)
except Exception:
return None
def _get_stats(self):
"""Returns standard OS file statistics."""
try:
return os.stat(self.file_path)
except OSError:
return None
def get_metadata(self):
"""
Base method to return a flat dict of fields.
Meant to be overridden or extended by subclasses.
"""
if not self.raw_data:
return {}
fmt = self.raw_data.get('format', {})
tags = fmt.get('tags', {})
# Default basic extraction
return {
'filename': os.path.basename(self.file_path),
'size': int(fmt.get('size', 0)),
'duration': fmt.get('duration'),
'bitrate': fmt.get('bit_rate'),
'title': tags.get('title') or tags.get('TITLE', 'N/A'),
'mtime': datetime.fromtimestamp(self.stats.st_mtime).isoformat() if self.stats else "N/A"
}
class MediaAnalyzer(FFProbeEngine):
"""An extended engine that adds advanced parsing for video/audio streams."""
def get_metadata(self):
data = super().get_metadata()
if not self.raw_data:
return data
streams = self.raw_data.get('streams', [])
v_stream = next((s for s in streams if s['codec_type'] == 'video'), {})
a_stream = next((s for s in streams if s['codec_type'] == 'audio'), {})
# Add advanced fields
data.update({
'vbitrate': v_stream.get('bit_rate', 'N/A'),
'abitrate': a_stream.get('bit_rate', 'N/A'),
'width': v_stream.get('width', 'N/A'),
'height': v_stream.get('height', 'N/A'),
'fps': self._parse_fps(v_stream.get('avg_frame_rate')),
'artist': self._get_tag('artist'),
'album': self._get_tag('album'),
'resolution': f"{v_stream.get('width')}x{v_stream.get('height')}" if v_stream.get('width') else "N/A"
})
return data
def _parse_fps(self, fps_str):
try:
num, den = map(int, fps_str.split('/'))
return round(num / den, 2) if den != 0 else "N/A"
except (ValueError, TypeError, ZeroDivisionError):
return "N/A"
def _get_tag(self, key):
tags = self.raw_data.get('format', {}).get('tags', {})
return tags.get(key) or tags.get(key.upper(), 'N/A')
# --- FRONTEND LAYER ---
class MediaCLI:
"""Handles CLI arguments, progress reporting, and formatting output."""
def __init__(self, engine_class=MediaAnalyzer):
self.engine_class = engine_class
self.args = self._parse_args()
def _parse_args(self):
parser = argparse.ArgumentParser(description="Extensible FFprobe Wrapper")
parser.add_argument('files', nargs='+')
parser.add_argument('-f', '--fields', default="duration,size,bitrate,resolution")
parser.add_argument('-H', '--human', action='store_true', help="Human readable output")
group = parser.add_mutually_exclusive_group()
group.add_argument('--json', action='store_true')
group.add_argument('--csv', action='store_true')
return parser.parse_args()
def _format_human(self, key, value):
"""Helper to format bits/bytes."""
try:
num = float(value)
is_bitrate = 'bitrate' in key
base = 1000.0 if is_bitrate else 1024.0
suffix = 'bps' if is_bitrate else 'B'
for unit in ['', 'K', 'M', 'G', 'T']:
if abs(num) < base:
return f"{num:.2f} {unit}{suffix}".strip()
num /= base
return f"{num:.2f} P{suffix}"
except (ValueError, TypeError):
return value
def _format_duration(self, seconds):
try:
sec = float(seconds)
mins, sec = divmod(int(sec), 60)
hrs, mins = divmod(mins, 60)
return f"{hrs:02d}:{mins:02d}:{sec:02d}" if hrs > 0 else f"{mins:02d}:{sec:02d}"
except (ValueError, TypeError):
return "N/A"
def run(self):
requested_fields = self.args.fields.split(',')
results = []
for i, path in enumerate(self.args.files, 1):
# Progress update
fname = os.path.basename(path)
sys.stderr.write(f"\rScanning [{i}/{len(self.args.files)}]: {fname[:30]}".ljust(60))
sys.stderr.flush()
# Process data using the provided engine
engine = self.engine_class(path)
metadata = engine.get_metadata()
if metadata:
# Apply transformations (Human readable / Duration)
row = {'filename': metadata['filename']}
for field in requested_fields:
val = metadata.get(field, "N/A")
if self.args.human and field in ['size', 'bitrate', 'vbitrate', 'abitrate']:
val = self._format_human(field, val)
if field == 'duration':
val = self._format_duration(val)
row[field] = str(val)
results.append(row)
sys.stderr.write("\r" + " " * 70 + "\r")
sys.stderr.flush()
self._output(results, ['filename'] + requested_fields)
def _output(self, data, headers):
if self.args.json:
print(json.dumps(data, indent=2))
elif self.args.csv:
writer = csv.DictWriter(sys.stdout, fieldnames=headers)
writer.writeheader()
writer.writerows(data)
else:
self._print_table(data, headers)
def _print_table(self, data, headers):
if not data: return
widths = {h: max(len(h), max((len(str(row[h])) for row in data), default=0)) for h in headers}
header_line = " ".join(h.upper().ljust(widths[h]) for h in headers)
print(header_line)
print("-" * len(header_line))
for row in data:
print(" ".join(str(row[h]).ljust(widths[h]) for h in headers))
if __name__ == "__main__":
cli = MediaCLI(MediaAnalyzer)
cli.run()