When turning a knob on a controller and the update is expensive
(particular use case is connecting to VLC via the telnet interface each time+)
So when you turn a knob, it accumulates a rapid sequence of events into a single
one, e.g. with ten clicks of the knob we generate a single +10 event. The basic
idea is our accumulator is on its own thread, running once every second, say,
and other threads call its inc() method to increment/decrement its internal
counter dx.
(+) The alternative is to maintain an open telnet to the VLC rather than recreating.
#!/usr/bin/env python
# accumulator
from threading import Thread
from time import sleep
# calls to acc.inc() add to accumulated values.
# if at each timer tick, dx is nonzero, we update x
# and call handler with the new value.
class Acc:
def __init__(self,handler):
self.thread = Thread(target=self.run)
self.handler = handler
self.x = 0
self.dx = 0
self.should_quit = False
self.period = 1
def inc(self,y=1):
self.dx += y
def run(self):
while not self.should_quit:
if self.dx != 0:
self.x += self.dx
self.dx = 0
self.handler(self.x)
sleep(self.period)
print("Acc Thread Exiting")
def start(self):
self.thread.start()
def join(self):
self.thread.join()
# main app
from icecream import ic; ic.configureOutput(includeContext=True)
from PySide6.QtCore import *
from PySide6.QtGui import *
from PySide6.QtWidgets import *
from PySide6.QtNetwork import *
from acc import Acc
app = QApplication()
class Main(QWidget):
def __init__(self):
super().__init__()
self.resize(400,400)
self.num = QLabel(self)
self.num.setText("#")
self.num.resize(200,400)
self.num.move(0,0)
self.num.setStyleSheet("font-family: Optima; font-size: 48px;")
self.incButton = QPushButton(self)
self.incButton.setText("inc")
self.incButton.resize(200,200)
self.incButton.move(200,0)
self.incButton.clicked.connect(self.incPressed)
self.decButton = QPushButton(self)
self.decButton.setText("dec")
self.decButton.resize(200,200)
self.decButton.move(200,200)
self.decButton.clicked.connect(self.decPressed)
self.acc = Acc(self.handler)
self.acc.period = 2
self.acc.start()
def killthread(self):
self.acc.should_quit = True
def incPressed(self,*xs,**kw):
ic(xs,kw)
self.acc.inc(1)
def decPressed(self,*xs,**kw):
ic(xs,kw)
self.acc.inc(-1)
def handler(self,x):
x = f"{x}"
self.num.setText(x)
main = Main()
main.show()
app.exec()
main.killthread()