Dup Ver Goto 📝

An Array of Three Nocturns

To
478 lines, 1627 words, 13800 chars Page 'ThreeNocturnArray' does not exist.

The nocturn can't be used as a midi device with Windows or Macos. So what I do is to plug them into a Raspberry Pi (a 2B at the moment), and that sends OSC to my music PC, which then turns the OSC into midi and does things like retaining CC values. The 48(=16×3) buttons have a dual function: they send notes or CCs when pressed, and also function as bank selectors (touch the middle encoder and press a button to choose a bank). One issue is the order in which the pi enumerates them. The pi-side has an OSC server so you can send /enum messages to change which array element each nocturn corresponds to. Note that this is a quick hack which suffices for my needs, rather than a properly written work of software engineering.

This uses the NocturnHedgehog code I wrote a while back. That manages the midi devices, and translates knob movements and button presses into a simple enumeration so that the rotaries are just numbered 0..whatever, and likewise the buttons.

All in all, the server-side and recipient-side code is under 1000 lines. At some point, if the design matures, it might be worth rewriting into something more professional and organised. For now it's a heath-robinson-rube-goldberg-hack-job.

Sender-side

Run this on the Pi. It requires python-osc and python-rtmidi to be installed. It requires a recent version of python-osc which allows you to specify the address family when connecting (as the recipient-side requires that the UDP be sent via Ipv4).

This code is currently hardwired for three nocturns, rather than trying to be a generic solution that works for an arbitrary number of them.

# Gang the nocturns together into a single array.
# nocturn 0 rrot does granularity
# touch rrot + button does channel

from time import sleep
from rtmidi import MidiIn, MidiOut
from hh import *
from hubs import HubOsc
from hh_util import decode_rel
from hh_nocturn import NocturnDelegateBase
from pythonosc.dispatcher import Dispatcher
from pythonosc.osc_server import BlockingOSCUDPServer
from threading import Thread
import sys
from setproctitle import setproctitle
setproctitle("hh_noctarr")

if len(sys.argv) > 1:
  target = sys.argv[1]
else:
  target = "behemoth"

def dechex(x):
  return f"{x=}=0x{x:x}"

midiin = MidiIn()
midiout = MidiOut()

should_quit = False
class OscServer:
  def __init__(self, ip="0.0.0.0", port=2809):
    self.ip = ip
    self.port = port
    self.should_quit = False
    self.thread = Thread(target=self.task)
  def dispatch(self,addr,*args):
    global set_nocturn_index
    ic("osc",addr,args)
    addr = addr.strip("/")
    if addr == "enum":
      print("Setting nocturn index")
      set_nocturn_index = True 
    elif addr == "setrot":
      try:
        i,v = args
        i = int(i)
        if isinstance(v,int):
          v = min(127,max(0,v)) # clamp
        elif isinstance(v,float):
          v = min(1.0,max(0.0,v))
          v = int(127.9*v)
      except ValueError as e:
        print(f"Invalid setrot args {args}",e)
      try:
        return nocturnarray.set_rot(i,v)
      except Exception as e:
        print(f"Exception {e} calling nocturnarray.setrot")
    elif addr == "setrrot":
      try:
        i,v = args
        i = int(i)
        if isinstance(v,int):
          v = min(127,max(0,v)) # clamp
        elif isinstance(v,float):
          v = min(1.0,max(0.0,v))
          v = int(127.9*v)
      except ValueError as e:
        print(f"Invalid setrot args {args}",e)
      try:
        return nocturnarray.set_rrot(i,v)
      except Exception as e:
        print(f"Exception {e} calling nocturnarray.setrot")
  def start(self):
    return self.thread.start()
  def join(self):
    print("Joining OSC thread")
    return self.thread.join()
  def task(self):
    ip = self.ip
    port = self.port
    self.dispatcher = Dispatcher()
    self.dispatcher.set_default_handler(self.dispatch)
    server = BlockingOSCUDPServer((ip, port), self.dispatcher)
    server.timeout = 1.0
    try:
      print(f"OSC Listening on {ip}:{port}")
      while not should_quit:
        server.handle_request()
      print("OSC Thread Quitting")
    except KeyboardInterrupt:
      print("Ctrl-C")
      exit()

def get_midi_ports_matching(pat):
  ins = []
  outs = []
  for i,x in enumerate(midiin.get_ports()):
    if pat in x:
      ins.append(i)
  for i,x in enumerate(midiout.get_ports()):
    if pat in x:
      outs.append(i)
  ports = list(zip(ins,outs))
  if len(ports) == 0:
    print(f"No ports matchng {pat}")
  return ports

set_nocturn_index = False
class NocturnArray:
  def __init__(self,hub,nocturns):
    """
    parameter is an array of NocturnArranElements

    this end handles channels,
    integrating to prouce cc values is done
    on the other end
    """
    self.hub = hub
    self.nocturns = list(nocturns)
    self.channel = 0
    self.nrots = 8
    self.nbuts = 16
    self.nchannels = self.nbuts * len(self.nocturns)
    self.set_channel(0)
    for nocturn in nocturns:
      nocturn.delegate.set_target(self)
  def set_channel(self,ch):
    if ch < 0:
      return
    if ch >= self.nchannels:
      return
    self.channel = ch
    noct_nidx = ch // self.nbuts
    noct_bidx = ch % self.nbuts
    for i,nocturn in enumerate(self.nocturns):
      for j in range(self.nbuts):
        if nocturn.delegate.array_index == noct_nidx and j == noct_bidx:
          print("set on",noct_nidx,noct_bidx)
          nocturn.delegate.set_but(j,1)
        else:
          nocturn.delegate.set_but(j,0)
  def set_rot(self,i,v):
    nocturns = {}
    for nocturn in self.nocturns:
      nocturns[nocturn.delegate.array_index] = nocturn.delegate
    nidx = i // 8
    ridx = i % 8
    if nidx in nocturns:
      nocturns[nidx].set_rot(ridx,v)
    else:
      print("Invalid nocturn array index",i)
  def set_rrot(self,i,v):
    nocturns = {}
    for nocturn in self.nocturns:
      nocturns[nocturn.delegate.array_index] = nocturn.delegate
    if i in nocturns:
      nocturns[i].set_rrot(v)
    else:
      print("Invalid nocturn array index",i)


  def but(self,nocturn_idx,i,x):
    rtouch = False
    idx = nocturn_idx*16+i
    for nocturn in self.nocturns:
      if nocturn.delegate.touch_state[8]:
        rtouch = True
        break 
    if rtouch:
      return self.set_channel(idx)
    # send but event to target
    idx = nocturn_idx * self.nbuts + i
    self.hub.send_but(self.channel,idx,x)
  def rot(self,nocturn_idx,i,dv):
    idx = nocturn_idx * self.nrots + i
    self.hub.send_dx(self.channel,idx,dv)
  def rrot(self,nocturn_idx,dv):
    idx = nocturn_idx + 64
    self.hub.send_dx(self.channel,idx,dv)

# See NocturnHedgehog for NocturnDelegateBase
class NocturnArrayElement(NocturnDelegateBase):
  def __init__(self,hub,nocturn,array_index=0,name="Noname"):
    super().__init__(hub,nocturn,name)
    self.nocturn = nocturn
    self.target = None
    self.array_index = array_index
    self.rrot_gran = 2
    self.rot_gran = 2
    self.rrot_val = 0
    self.rot_vals = [0]*self.num_rots
    self.touch_state = [0]*9
  def show_1but(self,n):
    if n > 15:
      return
    if n < 0:
      return
    for i in range(16):
      self.set_but(i,127 if i == n else 0)
  def set_target(self,target):
    self.target = target
  def init(self):
    self.activate()
  def touch(self,n,v):
    self.touch_state[n] = 1 if v > 0 else 0
  def but(self,i,x):
    global set_nocturn_index 
    if set_nocturn_index:
      if i < 3 and x > 0:
        self.array_index = i
        set_nocturn_index = False
        self.nocturn.delegate.set_all_rots(0)
        self.nocturn.delegate.set_rot(i,127)
      return
    if not self.target:
      return
    self.target.but(self.array_index,i,1 if x > 0 else 0)
  def rot(self,i,v):
    print(f"element rot {i=} {dechex(v)}")
    if not self.target:
      return
    v = decode_rel(v)
    v0 = self.rot_vals[i] // self.rot_gran
    self.rot_vals[i] = (self.rot_vals[i] + v)
    v1 = self.rot_vals[i] // self.rot_gran
    dv = v1 - v0
    print(f"rot {self.name=} {i=} {dechex(v)} {self.rot_vals[i]=} {dv=}")
    if dv != 0:
      self.target.rot(self.array_index,i,dv)
  def rrot(self,v):
    print(f"element rrot {dechex(v)}")
    if not self.target:
      return
    v = decode_rel(v)
    old = self.rrot_val
    oldx = old // self.rrot_gran
    self.rrot_val = self.rrot_val + v
    newx = self.rrot_val // self.rrot_gran
    dx = newx - oldx
    if dx != 0:
      self.target.rrot(self.array_index,dx)
  def deactivate(self):
    self.set_all_buts(0)
    self.set_all_rots(0)
  def activate(self):
    self.set_all_buts(0)
    self.set_all_rots(0)
    self.set_rrot(0)


def setup_nocturns(ios,hub):
  if len(ios) < 1:
    return

  for idx,io in enumerate(ios):
    i,o = io
    nocturn = NocturnDeviceSwitch(i,o,f"Nocturn t.{idx}")
    delegate = NocturnArrayElement(hub,nocturn,array_index=idx,name=f"NocturnArray {idx}")
    delegate.init()
    nocturn.setDelegate(delegate)
    delegate.set_rot(idx,127)
    nocturns.append(nocturn)

  array = NocturnArray(hub,nocturns)
  return array


def main():
  global hub, nocturns, should_quit, nocturnarray

  hub = HubOsc(None,0,target,2808,debughosts=["t420c","behemoth"])
  nocturns = []
  nocturn_ios = get_midi_ports_matching("Nocturn")
  nocturnarray = setup_nocturns(nocturn_ios,hub)

  hhs = nocturns

  osc_server = OscServer(port=2809)
  osc_server.start()

  try:
    while True:
      sleep(1)
  except KeyboardInterrupt:
    print("Ctrl-C")
    for n in hhs:
      n.delegate.deactivate()
    should_quit = True
    exit(0)

if __name__ == "__main__":
  main()

Recipient-side hh_to_midi.py

On Windows (I only use this code on Windows), you need to set up a midi loopback (with e.g. Loopmidi), so that the DAW listens to this loopback and the recipient-side script receives OSC and sends MIDI to this loopback. I use e.g. rpc_in and abc_in as midi loopback names, one for Reaper, one for Ableton, so that they don't fight over trying to listen to the same device if they're both running at the same time. You specify a midi device name pattern when starting, e.g. python hh_to_midi.py rpc_in.

# companion to nocturn array
# aim here is to maintain state of all the ccs

# 48 channels and 24 knobs per channel
#                 48 buttons per channel
# 48 channels into 16 midi channels means 3 nocturn channels per midi channel
#             so  72 knobs per midi channel, each controlling a cc
#                120 buttons per midi channel, 40 per nocturn channel
#                    so the bottom row on the third nocturn is sent as cc
#                    so 80 ccs per midi channel, button row on third nocturn
#                    sets cc 72..79 to 127/0 on press/release

import os
import sys
import re
import socket
from icecream import ic; ic.configureOutput(includeContext=True)

from pythonosc.dispatcher import Dispatcher
from pythonosc.osc_server import BlockingOSCUDPServer
from pythonosc.udp_client import SimpleUDPClient

from rtmidi import MidiOut

midiout = MidiOut()
args = sys.argv[1:]
if len(args) > 0:
  rx = re.compile(args[0])
else:
  rx = re.compile("")

for i,x in enumerate(midiout.get_ports()):
  if rx.search(x):
    print(f"Selected {i}:{x}")
    break
else:
  print(f"No midi port matching {args[0]}")
  exit(1)
midi_port = i
midiout.open_port(midi_port)

"""
from rtmidi import MidiOut

midiout = MidiOut()
if len(midiout.get_ports()) == 0:
  print(f"No midi port")
  exit(1)
midiout.open_port(0)
"""

def clamp(m,M,x):
  return max(m,min(M,x))

# The dispatcher receives OSC and calls methods on an instance of this class.
class HhToMidi:
  def __init__(self,midiout):
    self.midiout = midiout 
    self.ccbase = 0
    self.notebase = 0
    self.nrots = 24
    self.nbuts = 48

    self.gran = 1
    self.ccs = [ [ 0 for x in range(72) ] for y in range(16) ]

  def send(self,*xs):
    print(f"send midi {xs=}")
    self.midiout.send_message(bytes(xs))

  def rrot(self,idx,dx):
    self.gran = clamp(1,8,self.gran + dx)
    print(f"{self.gran=}")

  def dx(self,ch,idx,dx):
    if idx >= 64:
      return self.rrot(idx-64,dx)

    cha = ch // 3
    chb = ch % 3

    ccno = chb * self.nrots + idx
    y = self.ccs[cha][ccno]
    y = clamp(0,127,y+dx*self.gran)
    self.ccs[cha][ccno] = y
    print(f"cc[{cha}][{ccno}] = {y}")

    self.send(0xB0+cha,ccno,y)

  def but(self,ch,idx,v):
    if idx >= 40:
      cha = ch // 3
      chb = ch % 3
      ccno = 72+(8*chb)+idx
      y = 127 if v > 0 else 0
      return self.send(0xB0+cha,ccno,y)
    cha = ch // 3
    chb = ch % 3
    pitch = chb*40+idx
    if v > 0:
      self.send(0x90+cha,pitch,127)
    else:
      self.send(0x80+cha,pitch,0)

  # OSC gets sent here
  def __call__(self,addr,*args):
    print(f"recv {addr}: {args}")
    xs = addr.split("/")
    if xs[1] != "hh":
      print(f"Not hh {addr}")
      return
    if len(xs) != 5:
      print(f"Invalid address {addr} (wrong length)")
      return
    try:
      ch = int(xs[2])
      idx = int(xs[4])
      if xs[3] == "dx":
        dx = args[0]
        if not isinstance(dx,int):
          print(f"dx not given int parameter")
          return
        return self.dx(ch,idx,dx)
      elif xs[3] == "but":
        v = args[0]
        if not ( v == 0 or v == 1 ):
          print(f"but should be 0 or 1, got {v=}")
          return
        self.but(ch,idx,v)
      else:
        print(f"Other hh: {addr}")
    except ValueError:
      print(f"Value error with {addr=} {args=}")

hh_to_midi = HhToMidi(midiout)

dispatcher = Dispatcher()
dispatcher.set_default_handler(hh_to_midi)

ip = os.getenv("HOST","0.0.0.0")
port = int(os.getenv("PORT",2808))

try:
  server = BlockingOSCUDPServer((ip, port), dispatcher)
  server.serve_forever()  # Blocks forever
except KeyboardInterrupt:
  print("Ctrl-C")
midiout.close_port()