Source code for otio_sync_core.rabbitmq_network

"""RabbitMQ fanout-exchange network backend for OTIO Sync."""

from __future__ import annotations

import json
import logging as _logging
import queue
import threading
import uuid
from typing import Any

import pika

_logger = _logging.getLogger("otio_sync")


def _log(msg: str) -> None:
    if _logger.handlers:
        _logger.debug(msg)


[docs] class RabbitMQNetwork: """RabbitMQ network backend for OTIO Sync. Uses a **fanout exchange** so that every peer bound to the same exchange receives every published message. The exchange name is derived from *session_id*, which implicitly scopes peers to a session without any server-side configuration. Two dedicated background threads handle I/O so callers are never blocked by pika: * ``_consumer_thread`` — owns a ``BlockingConnection`` for receiving; pushes decoded payloads onto ``_incoming_queue``. * ``_publisher_thread`` — owns a separate ``BlockingConnection`` for sending; drains ``_send_queue`` in a tight loop with automatic reconnection on failure. ``send_payload`` therefore never touches a socket directly; it is always non-blocking for the caller. Self-filtering is applied in the consumer callback: any message whose ``source_guid`` matches *self_guid* is silently discarded before being enqueued. :param host: RabbitMQ broker hostname or IP. :param port: RabbitMQ broker AMQP port. :param session_id: Logical session name; used to derive the exchange name. :param self_guid: GUID of the local peer used to filter own messages. Auto-generated if not provided. """ def __init__( self, host: str = '127.0.0.1', port: int = 5672, session_id: str = 'otio-sync-default', self_guid: str | None = None, ) -> None: self.host = host self.port = port self.session_id = session_id self.self_guid = self_guid or str(uuid.uuid4()) self.exchange_name = f"sync_session_{session_id}" self._incoming_queue: queue.Queue[dict[str, Any]] = queue.Queue() self._send_queue: queue.Queue[dict[str, Any]] = queue.Queue() self._stop_event = threading.Event() # Set once the consumer queue is bound and basic_consume is registered. # Callers should wait on this before broadcasting WHO_IS_MASTER so that # the I_AM_MASTER response is not published before the queue exists. self._consumer_ready = threading.Event() self._consumer_thread = threading.Thread( target=self._run_consumer, daemon=True ) self._consumer_thread.start() self._publisher_thread = threading.Thread( target=self._run_publisher, daemon=True, name="rmq_publisher" ) self._publisher_thread.start() def _run_consumer(self) -> None: """Background consumer loop with automatic reconnection. Blocks on ``process_data_events`` in a tight loop, reconnecting with a 5-second delay whenever the broker connection drops. Exits cleanly when :attr:`_stop_event` is set. """ while not self._stop_event.is_set(): try: connection = pika.BlockingConnection( pika.ConnectionParameters(host=self.host, port=self.port) ) channel = connection.channel() channel.exchange_declare( exchange=self.exchange_name, exchange_type='fanout' ) result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange=self.exchange_name, queue=queue_name) def callback( ch: Any, method: Any, properties: Any, body: bytes, ) -> None: try: payload = json.loads(body.decode('utf-8')) if payload.get("source_guid") == self.self_guid: return _log( f"\n=== MQ RECV [{self.exchange_name}] ===\n" f"{json.dumps(payload, indent=2)}\n" ) self._incoming_queue.put(payload) except Exception as e: _log(f"Error processing message: {e}") channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True, ) _log( f"Connected to {self.host}:{self.port}, " f"listening on {self.exchange_name}" ) self._consumer_ready.set() while not self._stop_event.is_set(): connection.process_data_events(time_limit=1) connection.close() except Exception as e: if not self._stop_event.is_set(): _log(f"Consumer error: {e}. Retrying in 5s...") self._stop_event.wait(5) def _run_publisher(self) -> None: """Background publisher loop with automatic reconnection. Owns its own ``BlockingConnection`` so publishing never blocks the poll thread. Drains ``_send_queue`` as fast as the broker accepts messages, reconnecting with a 5-second delay on failure. Exits cleanly when :attr:`_stop_event` is set and the queue is empty. """ while not self._stop_event.is_set(): try: connection = pika.BlockingConnection( pika.ConnectionParameters(host=self.host, port=self.port) ) channel = connection.channel() channel.exchange_declare( exchange=self.exchange_name, exchange_type='fanout' ) _log(f"Publisher connected to {self.host}:{self.port}") while not self._stop_event.is_set(): try: data = self._send_queue.get(timeout=0.1) except queue.Empty: # Keep the connection alive while idle. connection.process_data_events(time_limit=0) continue channel.basic_publish( exchange=self.exchange_name, routing_key='', body=data, ) connection.close() except Exception as e: if not self._stop_event.is_set(): _log(f"Publisher error: {e}. Retrying in 5s...") self._stop_event.wait(5)
[docs] def wait_until_ready(self, timeout: float = 5.0) -> bool: """Block until the consumer queue is bound and ready to receive messages. :param timeout: Maximum seconds to wait before returning False. :returns: True if the consumer became ready within *timeout*, else False. :rtype: bool """ return self._consumer_ready.wait(timeout=timeout)
[docs] def send_payload(self, payload: dict[str, Any]) -> None: """Enqueue *payload* for publishing to the fanout exchange. Non-blocking: the actual socket write happens on the publisher thread. Injects ``source_guid`` into the payload if not already present. :param payload: Message envelope to broadcast. """ if "source_guid" not in payload: payload["source_guid"] = self.self_guid _log( f"\n=== MQ SEND [{self.exchange_name}] ===\n" f"{json.dumps(payload, indent=2)}\n" ) self._send_queue.put(json.dumps(payload).encode('utf-8'))
[docs] def receive_payloads(self) -> list[dict[str, Any]]: """Drain the internal queue and return all pending payloads. Non-blocking; returns an empty list when nothing is waiting. Messages are populated by the background consumer thread. :returns: List of received payload dicts. """ payloads: list[dict[str, Any]] = [] while not self._incoming_queue.empty(): try: payloads.append(self._incoming_queue.get_nowait()) except queue.Empty: break return payloads
[docs] def stop(self) -> None: """Signal background threads to exit and wait for them to finish. Blocks for up to 2 seconds per thread. """ self._stop_event.set() if self._consumer_thread.is_alive(): self._consumer_thread.join(timeout=2) if self._publisher_thread.is_alive(): self._publisher_thread.join(timeout=2)