I don't know how I ended up with the name bobbins, but it stuck. At first, I just wanted
to run macros and scripts triggered by a MIDI keyboard, when running non-music software.
This could be anything from play/pause/etc my music player, or play/pause a VLC instance
(via telnet) when pressing the soft pedal (useful for transcribing text, and the midi keys
can be used to skip by various amounts). I hit an odd problem: for rtmidi to work in Windows
I needed to use anaconda python rather than cygwin python. But for the scripts I wanted
to use cygwin (so I could use bash and my array of *nix scripts). So the approach I ended up
with was to run the rtmidi in a python process via anaconda, and send OSC to somewhere else
to run a script.
Server
#!/usr/bin/env python
import math
from subprocess import run
from pythonosc.dispatcher import Dispatcher
from pythonosc import osc_server
from threading import Thread
import sys
import os
class Bobbins:
def handle_osc(self,addr,*params):
print("osc",addr,params)
addr = addr.lstrip("/")
if addr == "cmd":
if len(params) > 0:
self.cmd = params[0]
print(f"Command now {self.cmd}")
else:
print(f"Command is {self.cmd}")
return
if not "/" in self.cmd:
cmd = "./"+self.cmd
else:
cmd = self.cmd
params = [str(x) for x in params]
cmdline = [cmd,addr,*params]
print(f"Running {cmdline}")
run(cmdline)
print(f"Finished {cmdline}")
def set_cmd(self,cmd):
self.cmd = cmd
def __init__(self,host,port,cmd="echo"):
self.cmd = cmd
self.host = host
self.port = port
self.commands = []
def join(self):
for command in self.commands:
command.join()
def start(self):
dispatcher = Dispatcher()
dispatcher.set_default_handler(self.handle_osc)
server = osc_server.BlockingOSCUDPServer( # Threading server hangs with cygwin
(self.host, self.port), dispatcher)
print("Serving on {}".format(server.server_address))
print(f"Command is {self.cmd}")
server.serve_forever()
def main():
args = sys.argv[1:]
if len(args) > 0:
cmd = args[0]
else:
cmd = "./bobbins"
try:
port = os.getenv("PORT",4001)
port = int(port)
if port < 1024 or port > 32000:
raise ValueError()
except ValueError:
print(f"Invalid port {port}")
exit(1)
host = "0.0.0.0"
bobbins = Bobbins(host,port,cmd=cmd)
bobbins.start()
if __name__ == "__main__":
main()
Example Bobbins Script
#!/bin/bash
echo "BOBBINS $@"
if [ "$1" = "note" ]; then
case "$2" in
59)
tvlcmd hostname index pause;;
58)
tvlcmd hostname index f;;
esac
fi
Midi to OSC Bobbins
#!/usr/bin/env python
from rtmidi import MidiIn
from pythonosc import udp_client
from time import sleep
import json
from icecream import ic; ic.configureOutput(includeContext=True)
import re
import sys
import os
class Bobbins:
def main(self):
host = os.getenv("BOBBINS_HOST","pi3")
port = int(os.getenv("BOBBINS_PORT",4001))
self.client = udp_client.SimpleUDPClient(host,port)
pat = os.getenv("DEV","M32")
midiin = MidiIn()
m32 = None
for i,x in enumerate(midiin.get_ports()):
if pat in x:
m32 = i
break
else:
print(f"Cannot find {pat}")
exit(1)
print(f"Found {m32}:{x} for {pat}")
midiin.set_callback(self.handle)
midiin.open_port(m32)
try:
while True:
sleep(1)
except KeyboardInterrupt:
print("Ctrl-C")
midiin.close_port()
def handle(self, event, data):
msg, dt = event
status = msg[0]&0xF0
print(msg,dt)
if status == 0x90:
print("note")
self.handle_noteon(*msg)
def handle_noteon(self,a,b,c):
note = str(b)
addr = "/note"
print("sendnote",addr,note)
self.client.send_message(addr,note)
if __name__ == "__main__":
bobbins = Bobbins()
bobbins.main()