Dup Ver Goto 📝

Hedgehog001

PT2/hedgehog hedgehog does not exist
To
306 lines, 1043 words, 9863 chars Page 'Hedgehog001' does not exist.

hh_base.py

import sys
from icecream import ic; ic.configureOutput(includeContext=True)
from datetime import datetime
from rtmidi import MidiIn, MidiOut
from hubs import HubBase

class DeviceBase:
  """Abstract Base Class of devices
  if we ever make something with an arduino using a custom serial protocol,
  then we'll inherit from this: MIDI devices inherit from MidiDeviceBase
  """
  def __init__(self,name="Device"):
    self.name = name
    self.delegate = DelegateBase(None,None,name="Null")
  def setDelegate(self,delegate):
    self.delegate = delegate
    print(f"DeviceBase({self.name}) delegate now: {self.delegate.name}")
  def __del__(self):
    pass

class MidiDevice:
  "Handle a midi device. Save duplication of code in other classes that handle midi devices."
  def __init__(self,inport,outport,callback):
    self.midiin = MidiIn()
    self.midiout = MidiOut()
    self.midiin.set_callback(callback)
    self.midiin.open_port(inport)
    self.midiout.open_port(outport)
  def __del__(self):
    self.midiin.close_port()
    self.midiout.close_port()
  def send_message(self,message):
    self.midiout.send_message(message)

class MidiDeviceBase(DeviceBase):
  "Handle the MIDI connection, delegate interpretation of events to subclasses"
  def __init__(self,inport,outport,name="MidiDevice",midi_ch=0):
    print(125,inport,outport,name)
    super().__init__(name=name) 
    self.midi_ch = midi_ch % 0x0f
    self.midi_dev = MidiDevice(inport,outport,self.callback)

  def callback(self,msg,*xs):
    msg, ts = msg
    status = msg[0] & 0xF0
    channel = msg[0] & 0xF0
    if status == 0xB0:
      return self.recv_cc(*msg[1:3])
    if status == 0x80:
      return self.recv_off(*msg[1:3])
    if status == 0x90:
      return self.recv_on(*msg[1:3])

  def recv_cc(self,n,v):
    print(f"delegate({self.delegate.name}) cc {n=} {v=}")
    self.delegate.recv_cc(n,v)
  def recv_on(self,p,v):
    print(f"delegate({self.delegate.name}) on {p=} {v=}")
    self.delegate.recv_on(p,v)
  def recv_off(self,p,v):
    print(f"delegate({self.delegate.name}) off {p=} {v=}")
    self.delegate.recv_off(p,v)
  def recv_osc(self,addr,params):
    print(f"osc {addr} {params=}")
    self.delegate.recv_osc(addr,params)
    print(f"cc {addr=} {params=}")

  def send_midi(self,msg):
    self.midi_dev.send_message(msg)

  def send_cc(self,n,v):
    msg = [ 0xB0 | self.midi_ch, n, v ]
    print("cc",msg)
    self.send_midi(msg)

  def send_on(self,p,v):
    msg = [ 0x90 | self.midi_ch, p, v ]
    print("on",msg)
    self.send_midi(msg)

  def send_off(self,p,v):
    msg = [ 0x80 | self.midi_ch, p, v ]
    print("off",msg)
    self.send_midi(msg)

class DelegateBase:
  def __init__(self,hub,device,name="Device"):
    self.name = name
    self.device = device
    self.hub = hub
  def __del__(self):
    pass
  def recv_cc(self,n,v):
    print(f"DelegateBase({self.name}): cc {n=} {v=}")
  def recv_on(self,p,v):
    print(f"DelegateBase({self.name}): on {p=} {v=}")
  def recv_off(self,p,v):
    print(f"DelegateBase({self.name}): off {p=} {v=}")
  def recv_osc(self,addr,params):
    print(f"DelegateBase({self.name}): osc {addr=} {params=}")
  def but(self,n,v):
    print(f"DelegateBase({self.name}): button: {n=} {v=}")
  def touch(self,n,v):
    print(f"DelegateBase({self.name}): touch: {n=} {v=}")
  def rot(self,n,dv):
    print(f"DelegateBase({self.name}): rot: {n=} {dv=}")
  def rrot(self,n,dv):
    print(f"DelegateBase({self.name}): rrot: {n=} {dv=}")
  def pot(self,n,v):
    print(f"DelegateBase({self.name}): pot: {n=} {v=}")

hh_util.py

def clamp(x,a,b):
  # could also do min(b,max(a,x)) if we want a one-liner
  if x < a:
    return a
  if x > b:
    return b
  return x

def decode_rel(x):
  "Convert relative to signed int"
  if x >= 0x40:
    return x - 0x80
  return x

hubs.py

The classes that talk to midi devices forward their events to a hub class. The hub class is responsible for actually sending e.g. OSC over the network. This decouples the device management code from the networking code: device delegate classes call the send_dx or send_but methods, and the hub knows what to do with them. This allows us to later change the protocol used without changing the device delegate code much.

import socket
from getip import get_ip_for_host
from pythonosc import udp_client

class HubBase:
  """Base class of Hedgehog hubs. Messages are relayed via the hub.
  This defines the interface for receiving messages from devices for forwarding.
  It does not include code for sending messages to devices, as that is the job
  of the subclass. But it does maintain a set of connected devices."""
  def __init__(self):
    self.devices = set()
  def add_device(self,dev):
    self.devices.add(dev)
  def remove_device(self,dev):
    if dev in self.devices:
      self.devices.remove(dev)
  def send_note(self,ch,what,pitch,velocity,params=[]):
    """Send a note event. what is string 'on', 'off' or other.
    pitch is note number: 0..127 are midi pitches, other
    numbers can have other uses such as additional level
    of channelising, or instruments that natively use hh
    as their event language rather than midi. velocity
    is a float normalised to 0.0..1.0 or an int normalised
    to 0..127, params are freeform (for now) and appened
    to arg list"""
    pass
  def send_x7(self,ch,i,x):
    "Send absolute value normalised to 0..127 as int"
    pass
  def send_x(self,ch,i,x):
    "Send absolute value normalised to 0..1.0 as float"
    pass
  def send_xb(self,ch,i,x):
    "Send bipolar absolute value normalised to -1.0..1.0 as float"
    pass
  def send_xr(self,ch,i,x,a,b):
    "Send absolute value normalised to a..b, and also range"
    pass
  def send_xi(self,ch,i,x):
    "Send absolute value as unnormalised int"
    pass
  def send_xf(self,ch,i,f):
    "Send absolute value as unnormalised float"
    pass
  def send_dx(self,ch,i,dx):
    pass
  def send_but(self,ch,i,v):
    pass
  def send_key(self,ch,k,v):
    "Send key press from qwerty keyboard, k is acms combo like ACMx, v is down, up, or press"
    pass
  def send_debug(self,msg):
    print(f"Debug: {msg}")

def get_ip_for_host(host):
  try:
    info = socket.getaddrinfo(host,0,family=socket.AF_INET,type=socket.SOCK_STREAM)
    ip = info[0][4][0]
    return ip
  except socket.gaierror:
    print(f"socket.gaierror getting {host}")
    return None
  except Exception as e:
    print(f"Exception {e} getting {host}")
    return None

class HubOsc(HubBase):
  def __init__(self,inhost,inport,outhost,outport,debughosts=None,debugport=4009):
    super().__init__()
    inhost = get_ip_for_host(inhost)
    outhost = get_ip_for_host(outhost)
    self.inhost = inhost
    self.inport = inport
    self.outhost = outhost
    self.outport = outport
    self.debughosts = []
    self.debugclients = []
    if debughosts is None:
      self.debughosts = []
    elif isinstance(debughosts,list):
      self.debughosts = debughosts
    else:
      self.debughosts = [debughosts]
    if self.outhost is not None and self.outport >= 1024:
      ip = get_ip_for_host(self.outhost)
      if ip is not None:
        self.client = udp_client.SimpleUDPClient(ip,outport)
      else:
        print(f"Couldn't find ip for out host {outhost}")
    for debughost in self.debughosts:
      ip = get_ip_for_host(debughost)
      if ip is not None:
        self.debugclients.append(udp_client.SimpleUDPClient(ip,debugport))
      else:
        print(f"Couldn't find ip for debug host {debughost}")
    if self.inhost is not None and self.outport >= 1024:
      print(f"In host not supported yet")

  def sendosc(self,addr,*args):
    self.client.send_message(addr,args)

  def send_note(self,ch,what,pitch,velocity,params=[]):
    addr = f"/hh/{ch}/note/{what}"
    params = [pitch,velocity]+params
    self.sendosc(addr,*params)
  def send_x7(self,ch,i,x):
    "Send absolute value normalised and clamped to 0..127 as int"
    addr = f"/hh/{ch}/x7/{i}"
    x = min(127,max(0,int(x)))
    self.sendosc(addr,int(x))
  def send_x(self,ch,i,x):
    "Send absolute value normalised to 0..1.0 as float"
    addr = f"/hh/{ch}/x/{i}"
    self.sendosc(addr,x)
  def send_xb(self,ch,i,x):
    "Send bipolar absolute value normalised to -1.0..1.0 as float"
    addr = f"/hh/{ch}/xb/{i}"
    self.sendosc(addr,x)
  def send_xr(self,ch,i,x,a,b):
    "Send absolute value normalised to a..b, and also range"
    addr = f"/hh/{ch}/xr/{i}"
    self.sendosc(addr,x,a,b)
  def send_xi(self,ch,i,x):
    "Send absolute value as unnormalised int"
    addr = f"/hh/{ch}/xi/{i}"
    self.sendosc(addr,x)
  def send_xf(self,ch,i,x):
    "Absolute value x unnormalised float"
    addr = f"/hh/{ch}/xf/{i}"
    self.sendosc(addr,x)
  def send_dx(self,ch,i,dx):
    "dx is int" 
    addr = f"/hh/{ch}/dx/{i}"
    self.sendosc(addr,dx)
  def send_dxf(self,ch,i,dx):
    "dx is float" 
    addr = f"/hh/{ch}/dxf/{i}"
    self.sendosc(addr,dx)
  def send_but(self,ch,i,v):
    "v is 0 for off, 1 for on" # use send_butf for velocity sensitivity
    addr = f"/hh/{ch}/but/{i}"
    v = int(v)
    if v != 0:
      v = 1
    self.sendosc(addr,v)
  def send_butf(self,ch,i,v):
    "v is 0 for off, nonzero for on, velocity normalised to 0.0..1.0" 
    # when converting to midi note value, we take w = int(v / 127.0) and if v != 0.0 yet w = 0, we set w = 1
    addr = f"/hh/{ch}/butf/{i}"
    v = min(1.0,max(0.0,v))
    self.sendosc(addr,v)  
  def send_key(self,ch,k,v):
    "combos we send key non modifier key is pressed"
    addr = f"/hh/{ch}/key/{k}"
    if not v in ["down","up","press"]:
      return 
    self.sendosc(addr,v)
  def send_debug(self,msg,*xs):
    print(f"Debug: {msg}")
    msg = str(msg)
    addr = f"append"
    params = [msg]+list(xs)
    for debugclient in self.debugclients:
      debugclient.send_message(addr,params)