title: Bobbins OSC Server tags: python osc net bobbins I don't know how I ended up with the name `bobbins`, but it stuck. At first, I just wanted to run macros and scripts triggered by a MIDI keyboard, when running non-music software. This could be anything from play/pause/etc my music player, or play/pause a VLC instance (via telnet) when pressing the soft pedal (useful for transcribing text, and the midi keys can be used to skip by various amounts). I hit an odd problem: for `rtmidi` to work in Windows I needed to use anaconda python rather than cygwin python. But for the scripts I wanted to use cygwin (so I could use bash and my array of *nix scripts). So the approach I ended up with was to run the `rtmidi` in a python process via anaconda, and send OSC to somewhere else to run a script. # Server ```py #!/usr/bin/env python import math from subprocess import run from pythonosc.dispatcher import Dispatcher from pythonosc import osc_server from threading import Thread import sys import os class Bobbins: def handle_osc(self,addr,*params): print("osc",addr,params) addr = addr.lstrip("/") if addr == "cmd": if len(params) > 0: self.cmd = params[0] print(f"Command now {self.cmd}") else: print(f"Command is {self.cmd}") return if not "/" in self.cmd: cmd = "./"+self.cmd else: cmd = self.cmd params = [str(x) for x in params] cmdline = [cmd,addr,*params] print(f"Running {cmdline}") run(cmdline) print(f"Finished {cmdline}") def set_cmd(self,cmd): self.cmd = cmd def __init__(self,host,port,cmd="echo"): self.cmd = cmd self.host = host self.port = port self.commands = [] def join(self): for command in self.commands: command.join() def start(self): dispatcher = Dispatcher() dispatcher.set_default_handler(self.handle_osc) server = osc_server.BlockingOSCUDPServer( # Threading server hangs with cygwin (self.host, self.port), dispatcher) print("Serving on {}".format(server.server_address)) print(f"Command is {self.cmd}") server.serve_forever() def main(): args = sys.argv[1:] if len(args) > 0: cmd = args[0] else: cmd = "./bobbins" try: port = os.getenv("PORT",4001) port = int(port) if port < 1024 or port > 32000: raise ValueError() except ValueError: print(f"Invalid port {port}") exit(1) host = "0.0.0.0" bobbins = Bobbins(host,port,cmd=cmd) bobbins.start() if __name__ == "__main__": main() ``` # Example Bobbins Script ```bash #!/bin/bash echo "BOBBINS $@" if [ "$1" = "note" ]; then case "$2" in 59) tvlcmd hostname index pause;; 58) tvlcmd hostname index f;; esac fi ``` # Midi to OSC Bobbins ```py #!/usr/bin/env python from rtmidi import MidiIn from pythonosc import udp_client from time import sleep import json from icecream import ic; ic.configureOutput(includeContext=True) import re import sys import os class Bobbins: def main(self): host = os.getenv("BOBBINS_HOST","pi3") port = int(os.getenv("BOBBINS_PORT",4001)) self.client = udp_client.SimpleUDPClient(host,port) pat = os.getenv("DEV","M32") midiin = MidiIn() m32 = None for i,x in enumerate(midiin.get_ports()): if pat in x: m32 = i break else: print(f"Cannot find {pat}") exit(1) print(f"Found {m32}:{x} for {pat}") midiin.set_callback(self.handle) midiin.open_port(m32) try: while True: sleep(1) except KeyboardInterrupt: print("Ctrl-C") midiin.close_port() def handle(self, event, data): msg, dt = event status = msg[0]&0xF0 print(msg,dt) if status == 0x90: print("note") self.handle_noteon(*msg) def handle_noteon(self,a,b,c): note = str(b) addr = "/note" print("sendnote",addr,note) self.client.send_message(addr,note) if __name__ == "__main__": bobbins = Bobbins() bobbins.main() ```