tags: hedgehog # `hh_base.py` ```py import sys from icecream import ic; ic.configureOutput(includeContext=True) from datetime import datetime from rtmidi import MidiIn, MidiOut from hubs import HubBase class DeviceBase: """Abstract Base Class of devices if we ever make something with an arduino using a custom serial protocol, then we'll inherit from this: MIDI devices inherit from MidiDeviceBase """ def __init__(self,name="Device"): self.name = name self.delegate = DelegateBase(None,None,name="Null") def setDelegate(self,delegate): self.delegate = delegate print(f"DeviceBase({self.name}) delegate now: {self.delegate.name}") def __del__(self): pass class MidiDevice: "Handle a midi device. Save duplication of code in other classes that handle midi devices." def __init__(self,inport,outport,callback): self.midiin = MidiIn() self.midiout = MidiOut() self.midiin.set_callback(callback) self.midiin.open_port(inport) self.midiout.open_port(outport) def __del__(self): self.midiin.close_port() self.midiout.close_port() def send_message(self,message): self.midiout.send_message(message) class MidiDeviceBase(DeviceBase): "Handle the MIDI connection, delegate interpretation of events to subclasses" def __init__(self,inport,outport,name="MidiDevice",midi_ch=0): print(125,inport,outport,name) super().__init__(name=name) self.midi_ch = midi_ch % 0x0f self.midi_dev = MidiDevice(inport,outport,self.callback) def callback(self,msg,*xs): msg, ts = msg status = msg[0] & 0xF0 channel = msg[0] & 0xF0 if status == 0xB0: return self.recv_cc(*msg[1:3]) if status == 0x80: return self.recv_off(*msg[1:3]) if status == 0x90: return self.recv_on(*msg[1:3]) def recv_cc(self,n,v): print(f"delegate({self.delegate.name}) cc {n=} {v=}") self.delegate.recv_cc(n,v) def recv_on(self,p,v): print(f"delegate({self.delegate.name}) on {p=} {v=}") self.delegate.recv_on(p,v) def recv_off(self,p,v): print(f"delegate({self.delegate.name}) off {p=} {v=}") self.delegate.recv_off(p,v) def recv_osc(self,addr,params): print(f"osc {addr} {params=}") self.delegate.recv_osc(addr,params) print(f"cc {addr=} {params=}") def send_midi(self,msg): self.midi_dev.send_message(msg) def send_cc(self,n,v): msg = [ 0xB0 | self.midi_ch, n, v ] print("cc",msg) self.send_midi(msg) def send_on(self,p,v): msg = [ 0x90 | self.midi_ch, p, v ] print("on",msg) self.send_midi(msg) def send_off(self,p,v): msg = [ 0x80 | self.midi_ch, p, v ] print("off",msg) self.send_midi(msg) class DelegateBase: def __init__(self,hub,device,name="Device"): self.name = name self.device = device self.hub = hub def __del__(self): pass def recv_cc(self,n,v): print(f"DelegateBase({self.name}): cc {n=} {v=}") def recv_on(self,p,v): print(f"DelegateBase({self.name}): on {p=} {v=}") def recv_off(self,p,v): print(f"DelegateBase({self.name}): off {p=} {v=}") def recv_osc(self,addr,params): print(f"DelegateBase({self.name}): osc {addr=} {params=}") def but(self,n,v): print(f"DelegateBase({self.name}): button: {n=} {v=}") def touch(self,n,v): print(f"DelegateBase({self.name}): touch: {n=} {v=}") def rot(self,n,dv): print(f"DelegateBase({self.name}): rot: {n=} {dv=}") def rrot(self,n,dv): print(f"DelegateBase({self.name}): rrot: {n=} {dv=}") def pot(self,n,v): print(f"DelegateBase({self.name}): pot: {n=} {v=}") ``` # `hh_util.py` ```py def clamp(x,a,b): # could also do min(b,max(a,x)) if we want a one-liner if x < a: return a if x > b: return b return x def decode_rel(x): "Convert relative to signed int" if x >= 0x40: return x - 0x80 return x ``` # `hubs.py` The classes that talk to midi devices forward their events to a hub class. The hub class is responsible for actually sending e.g. OSC over the network. This decouples the device management code from the networking code: device delegate classes call the `send_dx` or `send_but` methods, and the hub knows what to do with them. This allows us to later change the protocol used without changing the device delegate code much. ```py import socket from getip import get_ip_for_host from pythonosc import udp_client class HubBase: """Base class of Hedgehog hubs. Messages are relayed via the hub. This defines the interface for receiving messages from devices for forwarding. It does not include code for sending messages to devices, as that is the job of the subclass. But it does maintain a set of connected devices.""" def __init__(self): self.devices = set() def add_device(self,dev): self.devices.add(dev) def remove_device(self,dev): if dev in self.devices: self.devices.remove(dev) def send_note(self,ch,what,pitch,velocity,params=[]): """Send a note event. what is string 'on', 'off' or other. pitch is note number: 0..127 are midi pitches, other numbers can have other uses such as additional level of channelising, or instruments that natively use hh as their event language rather than midi. velocity is a float normalised to 0.0..1.0 or an int normalised to 0..127, params are freeform (for now) and appened to arg list""" pass def send_x7(self,ch,i,x): "Send absolute value normalised to 0..127 as int" pass def send_x(self,ch,i,x): "Send absolute value normalised to 0..1.0 as float" pass def send_xb(self,ch,i,x): "Send bipolar absolute value normalised to -1.0..1.0 as float" pass def send_xr(self,ch,i,x,a,b): "Send absolute value normalised to a..b, and also range" pass def send_xi(self,ch,i,x): "Send absolute value as unnormalised int" pass def send_xf(self,ch,i,f): "Send absolute value as unnormalised float" pass def send_dx(self,ch,i,dx): pass def send_but(self,ch,i,v): pass def send_key(self,ch,k,v): "Send key press from qwerty keyboard, k is acms combo like ACMx, v is down, up, or press" pass def send_debug(self,msg): print(f"Debug: {msg}") def get_ip_for_host(host): try: info = socket.getaddrinfo(host,0,family=socket.AF_INET,type=socket.SOCK_STREAM) ip = info[0][4][0] return ip except socket.gaierror: print(f"socket.gaierror getting {host}") return None except Exception as e: print(f"Exception {e} getting {host}") return None class HubOsc(HubBase): def __init__(self,inhost,inport,outhost,outport,debughosts=None,debugport=4009): super().__init__() inhost = get_ip_for_host(inhost) outhost = get_ip_for_host(outhost) self.inhost = inhost self.inport = inport self.outhost = outhost self.outport = outport self.debughosts = [] self.debugclients = [] if debughosts is None: self.debughosts = [] elif isinstance(debughosts,list): self.debughosts = debughosts else: self.debughosts = [debughosts] if self.outhost is not None and self.outport >= 1024: ip = get_ip_for_host(self.outhost) if ip is not None: self.client = udp_client.SimpleUDPClient(ip,outport) else: print(f"Couldn't find ip for out host {outhost}") for debughost in self.debughosts: ip = get_ip_for_host(debughost) if ip is not None: self.debugclients.append(udp_client.SimpleUDPClient(ip,debugport)) else: print(f"Couldn't find ip for debug host {debughost}") if self.inhost is not None and self.outport >= 1024: print(f"In host not supported yet") def sendosc(self,addr,*args): self.client.send_message(addr,args) def send_note(self,ch,what,pitch,velocity,params=[]): addr = f"/hh/{ch}/note/{what}" params = [pitch,velocity]+params self.sendosc(addr,*params) def send_x7(self,ch,i,x): "Send absolute value normalised and clamped to 0..127 as int" addr = f"/hh/{ch}/x7/{i}" x = min(127,max(0,int(x))) self.sendosc(addr,int(x)) def send_x(self,ch,i,x): "Send absolute value normalised to 0..1.0 as float" addr = f"/hh/{ch}/x/{i}" self.sendosc(addr,x) def send_xb(self,ch,i,x): "Send bipolar absolute value normalised to -1.0..1.0 as float" addr = f"/hh/{ch}/xb/{i}" self.sendosc(addr,x) def send_xr(self,ch,i,x,a,b): "Send absolute value normalised to a..b, and also range" addr = f"/hh/{ch}/xr/{i}" self.sendosc(addr,x,a,b) def send_xi(self,ch,i,x): "Send absolute value as unnormalised int" addr = f"/hh/{ch}/xi/{i}" self.sendosc(addr,x) def send_xf(self,ch,i,x): "Absolute value x unnormalised float" addr = f"/hh/{ch}/xf/{i}" self.sendosc(addr,x) def send_dx(self,ch,i,dx): "dx is int" addr = f"/hh/{ch}/dx/{i}" self.sendosc(addr,dx) def send_dxf(self,ch,i,dx): "dx is float" addr = f"/hh/{ch}/dxf/{i}" self.sendosc(addr,dx) def send_but(self,ch,i,v): "v is 0 for off, 1 for on" # use send_butf for velocity sensitivity addr = f"/hh/{ch}/but/{i}" v = int(v) if v != 0: v = 1 self.sendosc(addr,v) def send_butf(self,ch,i,v): "v is 0 for off, nonzero for on, velocity normalised to 0.0..1.0" # when converting to midi note value, we take w = int(v / 127.0) and if v != 0.0 yet w = 0, we set w = 1 addr = f"/hh/{ch}/butf/{i}" v = min(1.0,max(0.0,v)) self.sendosc(addr,v) def send_key(self,ch,k,v): "combos we send key non modifier key is pressed" addr = f"/hh/{ch}/key/{k}" if not v in ["down","up","press"]: return self.sendosc(addr,v) def send_debug(self,msg,*xs): print(f"Debug: {msg}") msg = str(msg) addr = f"append" params = [msg]+list(xs) for debugclient in self.debugclients: debugclient.send_message(addr,params) ```