Dup Ver Goto 📝

Bobbins OSC Server

To
154 lines, 478 words, 3949 chars Page 'BobbinsServer' does not exist.

I don't know how I ended up with the name bobbins, but it stuck. At first, I just wanted to run macros and scripts triggered by a MIDI keyboard, when running non-music software. This could be anything from play/pause/etc my music player, or play/pause a VLC instance (via telnet) when pressing the soft pedal (useful for transcribing text, and the midi keys can be used to skip by various amounts). I hit an odd problem: for rtmidi to work in Windows I needed to use anaconda python rather than cygwin python. But for the scripts I wanted to use cygwin (so I could use bash and my array of *nix scripts). So the approach I ended up with was to run the rtmidi in a python process via anaconda, and send OSC to somewhere else to run a script.

Server

#!/usr/bin/env python
import math
from subprocess import run
from pythonosc.dispatcher import Dispatcher
from pythonosc import osc_server
from threading import Thread
import sys
import os

class Bobbins:
  def handle_osc(self,addr,*params):
    print("osc",addr,params)
    addr = addr.lstrip("/")
    if addr == "cmd":
      if len(params) > 0:
        self.cmd = params[0]
        print(f"Command now {self.cmd}")
      else:
        print(f"Command is {self.cmd}")
      return
    if not "/" in self.cmd:
      cmd = "./"+self.cmd
    else:
      cmd = self.cmd
    params = [str(x) for x in params]
    cmdline = [cmd,addr,*params]
    print(f"Running {cmdline}")
    run(cmdline)
    print(f"Finished {cmdline}")
  def set_cmd(self,cmd):
    self.cmd = cmd
  def __init__(self,host,port,cmd="echo"):
    self.cmd = cmd
    self.host = host
    self.port = port
    self.commands = []
  def join(self):
    for command in self.commands:
      command.join()
  def start(self):
    dispatcher = Dispatcher()
    dispatcher.set_default_handler(self.handle_osc)

    server = osc_server.BlockingOSCUDPServer( # Threading server hangs with cygwin
        (self.host, self.port), dispatcher)
    print("Serving on {}".format(server.server_address))
    print(f"Command is {self.cmd}")
    server.serve_forever()

def main():
  args = sys.argv[1:]
  if len(args) > 0:
    cmd = args[0]
  else:
    cmd = "./bobbins"
  try:
    port = os.getenv("PORT",4001)
    port = int(port)
    if port < 1024 or port > 32000:
      raise ValueError()
  except ValueError:
    print(f"Invalid port {port}")
    exit(1)
  host = "0.0.0.0"
  bobbins = Bobbins(host,port,cmd=cmd)
  bobbins.start()

if __name__ == "__main__":
  main()

Example Bobbins Script

#!/bin/bash
echo "BOBBINS $@"
if [ "$1" = "note" ]; then
  case "$2" in
    59)
      tvlcmd hostname index pause;;
    58)   
      tvlcmd hostname index f;;
  esac
fi

Midi to OSC Bobbins

#!/usr/bin/env python
from rtmidi import MidiIn
from pythonosc import udp_client
from time import sleep
import json
from icecream import ic; ic.configureOutput(includeContext=True)
import re
import sys
import os

class Bobbins:
  def main(self):
    host = os.getenv("BOBBINS_HOST","pi3")
    port = int(os.getenv("BOBBINS_PORT",4001))
    self.client = udp_client.SimpleUDPClient(host,port)

    pat = os.getenv("DEV","M32")
    midiin = MidiIn()
    m32 = None
    for i,x in enumerate(midiin.get_ports()):
      if pat in x:
        m32 = i
        break
    else:
      print(f"Cannot find {pat}")
      exit(1)
    print(f"Found {m32}:{x} for {pat}")
    midiin.set_callback(self.handle)
    midiin.open_port(m32)

    try:
      while True:
        sleep(1)
    except KeyboardInterrupt:
      print("Ctrl-C")
    midiin.close_port()

  def handle(self, event, data):
    msg, dt = event
    status = msg[0]&0xF0
    print(msg,dt)
    if status == 0x90:
      print("note")
      self.handle_noteon(*msg)

  def handle_noteon(self,a,b,c):
    note = str(b)
    addr = "/note"
    print("sendnote",addr,note)
    self.client.send_message(addr,note)

if __name__ == "__main__":
  bobbins = Bobbins()
  bobbins.main()