Source code for otio_sync_core.network

"""UDP broadcast network backend and shared network Protocol for OTIO Sync."""

from __future__ import annotations

import json
import socket
from typing import Any, Protocol, runtime_checkable


[docs] @runtime_checkable class SyncNetworkProtocol(Protocol): """Structural interface that all network backends must satisfy. Both :class:`UDPNetwork` and :class:`~otio_sync_core.rabbitmq_network.RabbitMQNetwork` conform to this protocol, allowing :class:`~otio_sync_core.manager.SyncManager` to accept either without a concrete base class. """
[docs] def send_payload(self, payload: dict[str, Any]) -> None: """Broadcast *payload* to all peers in the session.""" ...
[docs] def receive_payloads(self) -> list[dict[str, Any]]: """Return all payloads received since the last call, without blocking.""" ...
[docs] def stop(self) -> None: """Shut down the network connection and release resources.""" ...
[docs] def get_local_broadcast() -> str: """Derive the LAN broadcast address from the default route interface. Falls back to ``255.255.255.255`` if the address cannot be determined. :returns: Broadcast IP address string, e.g. ``"192.168.1.255"``. """ try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] s.close() parts = ip.split('.') parts[-1] = '255' return '.'.join(parts) except Exception: return '255.255.255.255'
[docs] class UDPNetwork: """LAN broadcast network backend using UDP. Opens a non-blocking receive socket bound to *port* and a separate send socket with ``SO_BROADCAST`` set. All peers on the same LAN segment that bind to the same port will receive every message. Self-filtering is done via *self_guid*: any received payload whose ``source_guid`` matches is silently discarded. :param port: UDP port to bind and broadcast on. :param broadcast_ip: Explicit broadcast address; auto-detected when ``None``. :param self_guid: GUID of the local peer used to filter own messages. """ def __init__( self, port: int = 9999, broadcast_ip: str | None = None, self_guid: str | None = None, ) -> None: self.port = port self.broadcast_ip = broadcast_ip or get_local_broadcast() self.self_guid = self_guid self.send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.send_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.recv_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.recv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: self.recv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) except AttributeError: pass self.recv_sock.bind(('', self.port)) self.recv_sock.setblocking(False)
[docs] def send_payload(self, payload: dict[str, Any]) -> None: """Broadcast *payload* as JSON to the LAN. Injects ``source_guid`` into the payload if not already present. :param payload: Message envelope to broadcast. """ try: if self.self_guid and "source_guid" not in payload: payload["source_guid"] = self.self_guid data = json.dumps(payload).encode('utf-8') self.send_sock.sendto(data, (self.broadcast_ip, self.port)) except Exception as e: print(f"Failed to send payload: {e}")
[docs] def receive_payloads(self) -> list[dict[str, Any]]: """Drain all available UDP datagrams and return them as parsed dicts. Non-blocking; returns an empty list when no data is waiting. Own messages (matched by ``source_guid``) are silently dropped. :returns: List of received payload dicts. """ payloads = [] while True: try: data, _ = self.recv_sock.recvfrom(65535) try: payload = json.loads(data.decode('utf-8')) if self.self_guid and payload.get("source_guid") == self.self_guid: continue payloads.append(payload) except json.JSONDecodeError: pass except BlockingIOError: break except Exception as e: print(f"Error receiving payload: {e}") break return payloads
[docs] def close(self) -> None: """Close both sockets immediately.""" self.send_sock.close() self.recv_sock.close()
[docs] def stop(self) -> None: """Alias for :meth:`close`; satisfies :class:`SyncNetworkProtocol`.""" self.close()