hh_base.py
import sys
from icecream import ic; ic.configureOutput(includeContext=True)
from datetime import datetime
from rtmidi import MidiIn, MidiOut
from hubs import HubBase
class DeviceBase:
"""Abstract Base Class of devices
if we ever make something with an arduino using a custom serial protocol,
then we'll inherit from this: MIDI devices inherit from MidiDeviceBase
"""
def __init__(self,name="Device"):
self.name = name
self.delegate = DelegateBase(None,None,name="Null")
def setDelegate(self,delegate):
self.delegate = delegate
print(f"DeviceBase({self.name}) delegate now: {self.delegate.name}")
def __del__(self):
pass
class MidiDevice:
"Handle a midi device. Save duplication of code in other classes that handle midi devices."
def __init__(self,inport,outport,callback):
self.midiin = MidiIn()
self.midiout = MidiOut()
self.midiin.set_callback(callback)
self.midiin.open_port(inport)
self.midiout.open_port(outport)
def __del__(self):
self.midiin.close_port()
self.midiout.close_port()
def send_message(self,message):
self.midiout.send_message(message)
class MidiDeviceBase(DeviceBase):
"Handle the MIDI connection, delegate interpretation of events to subclasses"
def __init__(self,inport,outport,name="MidiDevice",midi_ch=0):
print(125,inport,outport,name)
super().__init__(name=name)
self.midi_ch = midi_ch % 0x0f
self.midi_dev = MidiDevice(inport,outport,self.callback)
def callback(self,msg,*xs):
msg, ts = msg
status = msg[0] & 0xF0
channel = msg[0] & 0xF0
if status == 0xB0:
return self.recv_cc(*msg[1:3])
if status == 0x80:
return self.recv_off(*msg[1:3])
if status == 0x90:
return self.recv_on(*msg[1:3])
def recv_cc(self,n,v):
print(f"delegate({self.delegate.name}) cc {n=} {v=}")
self.delegate.recv_cc(n,v)
def recv_on(self,p,v):
print(f"delegate({self.delegate.name}) on {p=} {v=}")
self.delegate.recv_on(p,v)
def recv_off(self,p,v):
print(f"delegate({self.delegate.name}) off {p=} {v=}")
self.delegate.recv_off(p,v)
def recv_osc(self,addr,params):
print(f"osc {addr} {params=}")
self.delegate.recv_osc(addr,params)
print(f"cc {addr=} {params=}")
def send_midi(self,msg):
self.midi_dev.send_message(msg)
def send_cc(self,n,v):
msg = [ 0xB0 | self.midi_ch, n, v ]
print("cc",msg)
self.send_midi(msg)
def send_on(self,p,v):
msg = [ 0x90 | self.midi_ch, p, v ]
print("on",msg)
self.send_midi(msg)
def send_off(self,p,v):
msg = [ 0x80 | self.midi_ch, p, v ]
print("off",msg)
self.send_midi(msg)
class DelegateBase:
def __init__(self,hub,device,name="Device"):
self.name = name
self.device = device
self.hub = hub
def __del__(self):
pass
def recv_cc(self,n,v):
print(f"DelegateBase({self.name}): cc {n=} {v=}")
def recv_on(self,p,v):
print(f"DelegateBase({self.name}): on {p=} {v=}")
def recv_off(self,p,v):
print(f"DelegateBase({self.name}): off {p=} {v=}")
def recv_osc(self,addr,params):
print(f"DelegateBase({self.name}): osc {addr=} {params=}")
def but(self,n,v):
print(f"DelegateBase({self.name}): button: {n=} {v=}")
def touch(self,n,v):
print(f"DelegateBase({self.name}): touch: {n=} {v=}")
def rot(self,n,dv):
print(f"DelegateBase({self.name}): rot: {n=} {dv=}")
def rrot(self,n,dv):
print(f"DelegateBase({self.name}): rrot: {n=} {dv=}")
def pot(self,n,v):
print(f"DelegateBase({self.name}): pot: {n=} {v=}")
hh_util.py
def clamp(x,a,b):
# could also do min(b,max(a,x)) if we want a one-liner
if x < a:
return a
if x > b:
return b
return x
def decode_rel(x):
"Convert relative to signed int"
if x >= 0x40:
return x - 0x80
return x
hubs.py
The classes that talk to midi devices forward their events to a hub class.
The hub class is responsible for actually sending e.g. OSC over the network.
This decouples the device management code from the networking code: device
delegate classes call the send_dx or send_but methods, and the hub knows
what to do with them. This allows us to later change the protocol used without
changing the device delegate code much.
import socket
from getip import get_ip_for_host
from pythonosc import udp_client
class HubBase:
"""Base class of Hedgehog hubs. Messages are relayed via the hub.
This defines the interface for receiving messages from devices for forwarding.
It does not include code for sending messages to devices, as that is the job
of the subclass. But it does maintain a set of connected devices."""
def __init__(self):
self.devices = set()
def add_device(self,dev):
self.devices.add(dev)
def remove_device(self,dev):
if dev in self.devices:
self.devices.remove(dev)
def send_note(self,ch,what,pitch,velocity,params=[]):
"""Send a note event. what is string 'on', 'off' or other.
pitch is note number: 0..127 are midi pitches, other
numbers can have other uses such as additional level
of channelising, or instruments that natively use hh
as their event language rather than midi. velocity
is a float normalised to 0.0..1.0 or an int normalised
to 0..127, params are freeform (for now) and appened
to arg list"""
pass
def send_x7(self,ch,i,x):
"Send absolute value normalised to 0..127 as int"
pass
def send_x(self,ch,i,x):
"Send absolute value normalised to 0..1.0 as float"
pass
def send_xb(self,ch,i,x):
"Send bipolar absolute value normalised to -1.0..1.0 as float"
pass
def send_xr(self,ch,i,x,a,b):
"Send absolute value normalised to a..b, and also range"
pass
def send_xi(self,ch,i,x):
"Send absolute value as unnormalised int"
pass
def send_xf(self,ch,i,f):
"Send absolute value as unnormalised float"
pass
def send_dx(self,ch,i,dx):
pass
def send_but(self,ch,i,v):
pass
def send_key(self,ch,k,v):
"Send key press from qwerty keyboard, k is acms combo like ACMx, v is down, up, or press"
pass
def send_debug(self,msg):
print(f"Debug: {msg}")
def get_ip_for_host(host):
try:
info = socket.getaddrinfo(host,0,family=socket.AF_INET,type=socket.SOCK_STREAM)
ip = info[0][4][0]
return ip
except socket.gaierror:
print(f"socket.gaierror getting {host}")
return None
except Exception as e:
print(f"Exception {e} getting {host}")
return None
class HubOsc(HubBase):
def __init__(self,inhost,inport,outhost,outport,debughosts=None,debugport=4009):
super().__init__()
inhost = get_ip_for_host(inhost)
outhost = get_ip_for_host(outhost)
self.inhost = inhost
self.inport = inport
self.outhost = outhost
self.outport = outport
self.debughosts = []
self.debugclients = []
if debughosts is None:
self.debughosts = []
elif isinstance(debughosts,list):
self.debughosts = debughosts
else:
self.debughosts = [debughosts]
if self.outhost is not None and self.outport >= 1024:
ip = get_ip_for_host(self.outhost)
if ip is not None:
self.client = udp_client.SimpleUDPClient(ip,outport)
else:
print(f"Couldn't find ip for out host {outhost}")
for debughost in self.debughosts:
ip = get_ip_for_host(debughost)
if ip is not None:
self.debugclients.append(udp_client.SimpleUDPClient(ip,debugport))
else:
print(f"Couldn't find ip for debug host {debughost}")
if self.inhost is not None and self.outport >= 1024:
print(f"In host not supported yet")
def sendosc(self,addr,*args):
self.client.send_message(addr,args)
def send_note(self,ch,what,pitch,velocity,params=[]):
addr = f"/hh/{ch}/note/{what}"
params = [pitch,velocity]+params
self.sendosc(addr,*params)
def send_x7(self,ch,i,x):
"Send absolute value normalised and clamped to 0..127 as int"
addr = f"/hh/{ch}/x7/{i}"
x = min(127,max(0,int(x)))
self.sendosc(addr,int(x))
def send_x(self,ch,i,x):
"Send absolute value normalised to 0..1.0 as float"
addr = f"/hh/{ch}/x/{i}"
self.sendosc(addr,x)
def send_xb(self,ch,i,x):
"Send bipolar absolute value normalised to -1.0..1.0 as float"
addr = f"/hh/{ch}/xb/{i}"
self.sendosc(addr,x)
def send_xr(self,ch,i,x,a,b):
"Send absolute value normalised to a..b, and also range"
addr = f"/hh/{ch}/xr/{i}"
self.sendosc(addr,x,a,b)
def send_xi(self,ch,i,x):
"Send absolute value as unnormalised int"
addr = f"/hh/{ch}/xi/{i}"
self.sendosc(addr,x)
def send_xf(self,ch,i,x):
"Absolute value x unnormalised float"
addr = f"/hh/{ch}/xf/{i}"
self.sendosc(addr,x)
def send_dx(self,ch,i,dx):
"dx is int"
addr = f"/hh/{ch}/dx/{i}"
self.sendosc(addr,dx)
def send_dxf(self,ch,i,dx):
"dx is float"
addr = f"/hh/{ch}/dxf/{i}"
self.sendosc(addr,dx)
def send_but(self,ch,i,v):
"v is 0 for off, 1 for on" # use send_butf for velocity sensitivity
addr = f"/hh/{ch}/but/{i}"
v = int(v)
if v != 0:
v = 1
self.sendosc(addr,v)
def send_butf(self,ch,i,v):
"v is 0 for off, nonzero for on, velocity normalised to 0.0..1.0"
# when converting to midi note value, we take w = int(v / 127.0) and if v != 0.0 yet w = 0, we set w = 1
addr = f"/hh/{ch}/butf/{i}"
v = min(1.0,max(0.0,v))
self.sendosc(addr,v)
def send_key(self,ch,k,v):
"combos we send key non modifier key is pressed"
addr = f"/hh/{ch}/key/{k}"
if not v in ["down","up","press"]:
return
self.sendosc(addr,v)
def send_debug(self,msg,*xs):
print(f"Debug: {msg}")
msg = str(msg)
addr = f"append"
params = [msg]+list(xs)
for debugclient in self.debugclients:
debugclient.send_message(addr,params)