add mock mode, websocket display stream, and text editor

- mock_rpi_ws281x: stub for running without RPi hardware; auto-fallback
  on RuntimeError, forced via --mock flag or MOCK_LED env var
- mock mode runs web server on port 8080 instead of 80
- replace display polling with WebSocket stream (/ws/display); client
  sends start/stop messages, server pushes binary RGB frames
- force-button feedback: button shows waiting state and polls until
  the cycle clears the flag
- text editor UI: edit top and bottom texts with per-segment color
  pickers; saved to texts.json for persistence across restarts
- bottom and top texts read from server state instead of hardcoded
- bottom text switched to _add_multi_color_node to support multi-color
- fix bottom text invisible: split scroll_all_multi_color_texts into
  top (y<8) and bottom (y>=8) passes so fill_pixels does not wipe
  bottom nodes before they are drawn; this also fixes force-vehicles
  polling loop that stalled when the bottom node never expired

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
cos
2026-04-30 21:15:39 +02:00
parent 0d06ab7ff2
commit 7d2fe96704
6 changed files with 522 additions and 84 deletions
+62 -24
View File
@@ -10,7 +10,17 @@ import random
import socket
import sys
from rpi_ws281x import PixelStrip, ws
import os
_mock = '--mock' in sys.argv or os.environ.get('MOCK_LED', '0') != '0'
if _mock:
from mock_rpi_ws281x import PixelStrip, ws
else:
try:
from rpi_ws281x import PixelStrip, ws
except ImportError:
from mock_rpi_ws281x import PixelStrip, ws
import config
import low_level
@@ -30,7 +40,16 @@ strip = PixelStrip(
channel=0,
strip_type=ws.WS2811_STRIP_GRB,
)
strip.begin()
if not _mock:
try:
strip.begin()
except RuntimeError as e:
print(f'[warn] rpi_ws281x init failed ({e}), falling back to mock', file=sys.stderr)
from mock_rpi_ws281x import PixelStrip as _MockStrip, ws as _mock_ws
strip = _MockStrip(config.NUMPIXELS, config.PIN)
_mock = True
else:
strip.begin()
low_level.strip = strip
_udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@@ -244,10 +263,12 @@ def scroll_all_texts(split_scroll_mode=False):
for ch in node['content']:
_draw_char(ch, node['char_height'], node['char_width'], r, g, b, cursor)
def scroll_all_multi_color_texts(split_scroll_mode=False):
def scroll_all_multi_color_texts(split_scroll_mode=False, y_min=0, y_max=None):
for node in multi_color_text_nodes:
if node['disappear_time'] == 0:
continue
if node['pos_y'] < y_min or (y_max is not None and node['pos_y'] >= y_max):
continue
if node['is_scrolling']:
if node['scroll_slowness'] > node['scroll_progress']:
node['scroll_progress'] += 1
@@ -292,8 +313,10 @@ def handle_disappear_timers():
0, 0, 0
)
def handle_multi_color_disappear_timers():
def handle_multi_color_disappear_timers(y_min=0, y_max=None):
for node in multi_color_text_nodes:
if node['pos_y'] < y_min or (y_max is not None and node['pos_y'] >= y_max):
continue
if node['disappear_time'] > 0:
node['disappear_time'] -= 1
if node['disappear_time'] == 0:
@@ -340,9 +363,12 @@ def handle_program1():
elif not main_animation_started:
if not _is_node_existing(0) and not _is_node_existing(1):
main_animation_started = True
_tc = server.state.get('texts', {}).get('top', [{}])
_te = _tc[0] if _tc else {}
_tclrs = [(_c['r'], _c['g'], _c['b'], _c['from']) for _c in _te.get('colors', [{'r':40,'g':40,'b':40,'from':0}, {'r':0,'g':0,'b':50,'from':9}])]
node = _add_multi_color_node(
"Witamy w PTI",
[(40, 40, 40, 0), (0, 0, 50, 9)],
_te.get('text', 'Witamy w PTI'),
_tclrs,
0, 0, 3, True, True, -1, False
)
if node:
@@ -364,9 +390,12 @@ def handle_program1():
return
# Re-add top text from right edge
_tc = server.state.get('texts', {}).get('top', [{}])
_te = _tc[0] if _tc else {}
_tclrs = [(_c['r'], _c['g'], _c['b'], _c['from']) for _c in _te.get('colors', [{'r':40,'g':40,'b':40,'from':0}, {'r':0,'g':0,'b':50,'from':9}])]
node = _add_multi_color_node(
"Witamy w PTI",
[(40, 40, 40, 0), (0, 0, 50, 9)],
_te.get('text', 'Witamy w PTI'),
_tclrs,
config.DISPLAY_MAX_X, 0, 3, True, True, -1, False
)
if node:
@@ -378,6 +407,9 @@ def handle_program1():
for node in text_nodes:
if node['disappear_time'] != 0:
node['disappear_time'] = 1
for node in multi_color_text_nodes:
if node['global_id'] == scroll_node_global_id and node['disappear_time'] != 0:
node['disappear_time'] = 1
scroll_node_active = False
if program2_active:
program2_active = False
@@ -404,18 +436,23 @@ def handle_program1():
low_level.fill_pixels(0, 9, config.DISPLAY_MAX_X, config.DISPLAY_MAX_Y, 0, 0, 0)
return
texts = [
("Technik informatyk + ai, robotyka", (0, 0, 50)),
("Technik programista", (50, 40, 0)),
("Technik poligrafii i grafiki komputerowej", (0, 45, 0)),
("Technik reklamy", (60, 20, 0)),
]
text, color = texts[bottom_text_state]
node = _add_text_node(text, color, config.DISPLAY_MAX_X, 9, 1, True, True, -1, False)
if node is not None:
scroll_node_global_id = node['global_id']
scroll_node_active = True
bottom_text_state = (bottom_text_state + 1) % 4
_bottom_entries = server.state.get('texts', {}).get('bottom', [])
if not _bottom_entries:
_bottom_entries = [
{'text': 'Technik informatyk + ai, robotyka', 'colors': [{'r': 0, 'g': 0, 'b': 50, 'from': 0}]},
{'text': 'Technik programista', 'colors': [{'r': 50, 'g': 40, 'b': 0, 'from': 0}]},
{'text': 'Technik poligrafii i grafiki komputerowej', 'colors': [{'r': 0, 'g': 45, 'b': 0, 'from': 0}]},
{'text': 'Technik reklamy', 'colors': [{'r': 60, 'g': 20, 'b': 0, 'from': 0}]},
]
if _bottom_entries:
_idx = bottom_text_state % len(_bottom_entries)
_be = _bottom_entries[_idx]
_bclrs = [(_c['r'], _c['g'], _c['b'], _c['from']) for _c in _be.get('colors', [{'r': 50, 'g': 50, 'b': 50, 'from': 0}])]
node = _add_multi_color_node(_be.get('text', ''), _bclrs, config.DISPLAY_MAX_X, 9, 1, True, True, -1, False)
if node is not None:
scroll_node_global_id = node['global_id']
scroll_node_active = True
bottom_text_state = (bottom_text_state + 1) % len(_bottom_entries)
def reset_dvd():
@@ -529,7 +566,7 @@ def setup():
for i in range(config.NUMPIXELS):
strip.setPixelColor(i, 0)
strip.show()
server.start_server(port=80)
server.start_server(port=8080 if _mock else 80)
print("Server started. Running display loop.")
@@ -553,9 +590,8 @@ def loop():
_send_frame()
return
if s['program_top_text_enabled']:
scroll_all_multi_color_texts(split_scroll_mode=True)
handle_multi_color_disappear_timers()
scroll_all_multi_color_texts(split_scroll_mode=True, y_max=8)
handle_multi_color_disappear_timers(y_max=8)
if not s['program_bottom_text_enabled']:
if program2_active:
@@ -573,6 +609,8 @@ def loop():
low_level.fill_pixels(0, 9, config.DISPLAY_MAX_X, config.DISPLAY_MAX_Y, 0, 0, 0)
scroll_all_texts(split_scroll_mode=True)
handle_disappear_timers()
scroll_all_multi_color_texts(split_scroll_mode=True, y_min=8)
handle_multi_color_disappear_timers(y_min=8)
strip.show()
_send_frame()