Dup Ver Goto 📝

SimpleVideoInfoQuery

To
385 lines, 1295 words, 13398 chars Page 'SimpleVideoInfoQuery' does not exist.

Another exercise in use Gemini.

First Version

#!/usr/bin/env python3

prompt = r"""
A python wrapper script for ffprobe. For example:
get_video_info -f bitrate,vbitrate,abitrate,duration,title a.mp4 b.mp4
writing output to the console, optionally outputting json or csv. (--json or --csv)
-f is short for --fields.
"""

prompt = r"""
could you include querying for resolution, frame rate, metadata like artist,
album, etc (so it can be used for mp3 and m4a files too)
"""

prompt = r"""
could the output be a table with the filename first, and headers
"""

prompt = r"""
When scanning, could it print out the current filename, then use \r to
overwrite the line, until done, then clear the line and print the output. Also
when scanning, include a counter e.g. 4/15 showing progress.
"""

prompt = r"""
for things like bitrate, an option to enable human readable (pick a letter for
the short version, but not -h since that's help)
"""

import subprocess
import json
import csv
import sys
import argparse
import os

def get_probe_data(file_path):
    cmd = [
        'ffprobe', '-v', 'quiet', '-print_format', 'json', 
        '-show_format', '-show_streams', file_path
    ]
    try:
        result = subprocess.run(cmd, capture_output=True, text=True, check=True)
        return json.loads(result.stdout)
    except Exception:
        return None

def format_human_size(value, suffix='B', is_bitrate=False):
    """Converts bytes or bits to human readable strings (KB, MB, GB / kbps, Mbps)."""
    try:
        num = float(value)
        # Bitrates are usually base-10 (1000), file sizes base-2 (1024)
        base = 1000.0 if is_bitrate else 1024.0
        units = ['', 'K', 'M', 'G', 'T']
        if is_bitrate:
            suffix = 'bps'

        for unit in units:
            if abs(num) < base:
                return f"{num:.2f} {unit}{suffix}".strip()
            num /= base
        return f"{num:.2f} P{suffix}"
    except (ValueError, TypeError):
        return "N/A"

def format_duration(seconds):
    try:
        sec = float(seconds)
        mins, sec = divmod(int(sec), 60)
        hrs, mins = divmod(mins, 60)
        return f"{hrs:02d}:{mins:02d}:{sec:02d}" if hrs > 0 else f"{mins:02d}:{sec:02d}"
    except (ValueError, TypeError):
        return "N/A"

def extract_fields(data, requested_fields, human_readable=False):
    if not data: return None
    fmt = data.get('format', {})
    streams = data.get('streams', [])
    tags = fmt.get('tags', {})
    v_stream = next((s for s in streams if s['codec_type'] == 'video'), {})
    a_stream = next((s for s in streams if s['codec_type'] == 'audio'), {})

    fr_raw = v_stream.get('avg_frame_rate', '0/0')
    try:
        num, den = map(int, fr_raw.split('/'))
        fps = round(num / den, 2) if den != 0 else "N/A"
    except (ValueError, ZeroDivisionError):
        fps = "N/A"

    # Raw Data Mapping
    raw_vals = {
        'bitrate': fmt.get('bit_rate'),
        'vbitrate': v_stream.get('bit_rate'),
        'abitrate': a_stream.get('bit_rate'),
        'size': fmt.get('size'),
        'duration': fmt.get('duration'),
        'width': v_stream.get('width', 'N/A'),
        'height': v_stream.get('height', 'N/A'),
        'fps': fps,
        'title': tags.get('title') or tags.get('TITLE', 'N/A'),
        'artist': tags.get('artist') or tags.get('ARTIST', 'N/A'),
        'album': tags.get('album') or tags.get('ALBUM', 'N/A'),
    }

    # Final logic processing
    res = {}
    for f in requested_fields:
        val = raw_vals.get(f, "N/A")

        if human_readable and val != "N/A":
            if f in ['bitrate', 'vbitrate', 'abitrate']:
                val = format_human_size(val, is_bitrate=True)
            elif f == 'size':
                val = format_human_size(val)

        if f == 'duration':
            val = format_duration(val)
        elif f == 'resolution':
            val = f"{raw_vals['width']}x{raw_vals['height']}" if raw_vals['width'] != 'N/A' else "N/A"

        res[f] = str(val)
    return res

def print_table(data, headers):
    if not data: return
    widths = {h: len(h) for h in headers}
    for row in data:
        for h in headers:
            widths[h] = max(widths[h], len(str(row[h])))

    header_line = "  ".join(h.upper().ljust(widths[h]) for h in headers)
    print(header_line)
    print("-" * len(header_line))
    for row in data:
        print("  ".join(str(row[h]).ljust(widths[h]) for h in headers))

def main():
    parser = argparse.ArgumentParser(description="FFprobe Wrapper")
    parser.add_argument('files', nargs='+')
    parser.add_argument('-f', '--fields', default="duration,size,bitrate,resolution")
    parser.add_argument('-H', '--human', action='store_true', help="Human readable sizes/bitrates")
    group = parser.add_mutually_exclusive_group()
    group.add_argument('--json', action='store_true')
    group.add_argument('--csv', action='store_true')

    args = parser.parse_args()
    requested_fields = args.fields.split(',')
    all_headers = ['filename'] + requested_fields

    results = []
    for i, file_path in enumerate(args.files, 1):
        base_name = os.path.basename(file_path)
        display_name = (base_name[:30] + '..') if len(base_name) > 32 else base_name
        sys.stderr.write(f"\rScanning [{i}/{len(args.files)}]: {display_name}".ljust(60))
        sys.stderr.flush()

        raw_data = get_probe_data(file_path)
        if raw_data:
            info = extract_fields(raw_data, requested_fields, args.human)
            info['filename'] = base_name
            results.append(info)

    sys.stderr.write("\r" + " " * 70 + "\r")
    sys.stderr.flush()

    if args.json:
        print(json.dumps(results, indent=2))
    elif args.csv:
        writer = csv.DictWriter(sys.stdout, fieldnames=all_headers)
        writer.writeheader()
        writer.writerows(results)
    else:
        print_table(results, all_headers)

if __name__ == "__main__":
    main()

Asking For It Packaged In Classes

#!/usr/bin/env python3

# following from vinf
prompt = r"""
Could you organise this as a class which can be extended by subclassing. So it handles stuff like reading and parsing the json into a data structure, and for good measure including what's available via stat(), so that I can write methods to parse and return stuff in a subclass.

Then a front end class which takes a backend class and handles command line and output stuff.
"""

import subprocess
import json
import csv
import sys
import argparse
import os
from datetime import datetime

# --- BACKEND LAYER ---

class FFProbeEngine:
    """Handles the raw data acquisition from ffprobe and system stat."""

    def __init__(self, file_path):
        self.file_path = file_path
        self.raw_data = self._probe()
        self.stats = self._get_stats()

    def _probe(self):
        """Runs ffprobe and returns the parsed JSON."""
        cmd = [
            'ffprobe', '-v', 'quiet', '-print_format', 'json', 
            '-show_format', '-show_streams', self.file_path
        ]
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, check=True)
            return json.loads(result.stdout)
        except Exception:
            return None

    def _get_stats(self):
        """Returns standard OS file statistics."""
        try:
            return os.stat(self.file_path)
        except OSError:
            return None

    def get_metadata(self):
        """
        Base method to return a flat dict of fields. 
        Meant to be overridden or extended by subclasses.
        """
        if not self.raw_data:
            return {}

        fmt = self.raw_data.get('format', {})
        tags = fmt.get('tags', {})

        # Default basic extraction
        return {
            'filename': os.path.basename(self.file_path),
            'size': int(fmt.get('size', 0)),
            'duration': fmt.get('duration'),
            'bitrate': fmt.get('bit_rate'),
            'title': tags.get('title') or tags.get('TITLE', 'N/A'),
            'mtime': datetime.fromtimestamp(self.stats.st_mtime).isoformat() if self.stats else "N/A"
        }

class MediaAnalyzer(FFProbeEngine):
    """An extended engine that adds advanced parsing for video/audio streams."""

    def get_metadata(self):
        data = super().get_metadata()
        if not self.raw_data:
            return data

        streams = self.raw_data.get('streams', [])
        v_stream = next((s for s in streams if s['codec_type'] == 'video'), {})
        a_stream = next((s for s in streams if s['codec_type'] == 'audio'), {})

        # Add advanced fields
        data.update({
            'vbitrate': v_stream.get('bit_rate', 'N/A'),
            'abitrate': a_stream.get('bit_rate', 'N/A'),
            'width': v_stream.get('width', 'N/A'),
            'height': v_stream.get('height', 'N/A'),
            'fps': self._parse_fps(v_stream.get('avg_frame_rate')),
            'artist': self._get_tag('artist'),
            'album': self._get_tag('album'),
            'resolution': f"{v_stream.get('width')}x{v_stream.get('height')}" if v_stream.get('width') else "N/A"
        })
        return data

    def _parse_fps(self, fps_str):
        try:
            num, den = map(int, fps_str.split('/'))
            return round(num / den, 2) if den != 0 else "N/A"
        except (ValueError, TypeError, ZeroDivisionError):
            return "N/A"

    def _get_tag(self, key):
        tags = self.raw_data.get('format', {}).get('tags', {})
        return tags.get(key) or tags.get(key.upper(), 'N/A')


# --- FRONTEND LAYER ---

class MediaCLI:
    """Handles CLI arguments, progress reporting, and formatting output."""

    def __init__(self, engine_class=MediaAnalyzer):
        self.engine_class = engine_class
        self.args = self._parse_args()

    def _parse_args(self):
        parser = argparse.ArgumentParser(description="Extensible FFprobe Wrapper")
        parser.add_argument('files', nargs='+')
        parser.add_argument('-f', '--fields', default="duration,size,bitrate,resolution")
        parser.add_argument('-H', '--human', action='store_true', help="Human readable output")
        group = parser.add_mutually_exclusive_group()
        group.add_argument('--json', action='store_true')
        group.add_argument('--csv', action='store_true')
        return parser.parse_args()

    def _format_human(self, key, value):
        """Helper to format bits/bytes."""
        try:
            num = float(value)
            is_bitrate = 'bitrate' in key
            base = 1000.0 if is_bitrate else 1024.0
            suffix = 'bps' if is_bitrate else 'B'
            for unit in ['', 'K', 'M', 'G', 'T']:
                if abs(num) < base:
                    return f"{num:.2f} {unit}{suffix}".strip()
                num /= base
            return f"{num:.2f} P{suffix}"
        except (ValueError, TypeError):
            return value

    def _format_duration(self, seconds):
        try:
            sec = float(seconds)
            mins, sec = divmod(int(sec), 60)
            hrs, mins = divmod(mins, 60)
            return f"{hrs:02d}:{mins:02d}:{sec:02d}" if hrs > 0 else f"{mins:02d}:{sec:02d}"
        except (ValueError, TypeError):
            return "N/A"

    def run(self):
        requested_fields = self.args.fields.split(',')
        results = []

        for i, path in enumerate(self.args.files, 1):
            # Progress update
            fname = os.path.basename(path)
            sys.stderr.write(f"\rScanning [{i}/{len(self.args.files)}]: {fname[:30]}".ljust(60))
            sys.stderr.flush()

            # Process data using the provided engine
            engine = self.engine_class(path)
            metadata = engine.get_metadata()

            if metadata:
                # Apply transformations (Human readable / Duration)
                row = {'filename': metadata['filename']}
                for field in requested_fields:
                    val = metadata.get(field, "N/A")
                    if self.args.human and field in ['size', 'bitrate', 'vbitrate', 'abitrate']:
                        val = self._format_human(field, val)
                    if field == 'duration':
                        val = self._format_duration(val)
                    row[field] = str(val)
                results.append(row)

        sys.stderr.write("\r" + " " * 70 + "\r")
        sys.stderr.flush()

        self._output(results, ['filename'] + requested_fields)

    def _output(self, data, headers):
        if self.args.json:
            print(json.dumps(data, indent=2))
        elif self.args.csv:
            writer = csv.DictWriter(sys.stdout, fieldnames=headers)
            writer.writeheader()
            writer.writerows(data)
        else:
            self._print_table(data, headers)

    def _print_table(self, data, headers):
        if not data: return
        widths = {h: max(len(h), max((len(str(row[h])) for row in data), default=0)) for h in headers}
        header_line = "  ".join(h.upper().ljust(widths[h]) for h in headers)
        print(header_line)
        print("-" * len(header_line))
        for row in data:
            print("  ".join(str(row[h]).ljust(widths[h]) for h in headers))

if __name__ == "__main__":
    cli = MediaCLI(MediaAnalyzer)
    cli.run()