"""Core synchronisation manager for the OTIO Sync protocol.
:class:`SyncManager` maintains a GUID-indexed map of every OTIO object in the shared
session and coordinates mutations across a pluggable network layer. It implements the
master-election handshake, delta buffering during join, and all broadcast helpers
defined in the OTIO Sync Protocol v1 proposal.
"""
from __future__ import annotations
import json
import logging as _logging
import time
import uuid
from typing import Any, Callable
import opentimelineio as otio
from .network import SyncNetworkProtocol
from .proxy import OTIOSyncProxy
from .patcher import OTIOPatcher, _otio_to_dict, _dict_to_otio
from .protocol_messages import (
ProtocolMessage,
message_for,
AddTimeline,
DisplaySettingsSet,
IAmMaster,
InsertChild,
PartialAnnotation,
PlaybackSettingsSet,
RenameTimeline,
ReplaceAnnotationCommands,
SelectionSet,
SetProperty,
StateRequest,
StateSnapshot,
WhoIsMaster,
)
_logger = _logging.getLogger("otio_sync")
def _log(msg: str) -> None:
if _logger.handlers:
_logger.debug(msg)
[docs]
def sync_event_schema(cmd: Any) -> str:
"""Return the OTIO schema name for a SyncEvent object or a serialised dict.
Centralises the ``hasattr(cmd, "schema_name") / isinstance(cmd, dict)``
pattern that appears throughout annotation-handling code.
:param cmd: A deserialised SyncEvent object or a raw ``dict`` whose
``"OTIO_SCHEMA"`` key carries the schema name.
:returns: Schema name string (e.g. ``"PaintStart.1"``), or ``""`` if
*cmd* is neither.
:rtype: str
"""
if hasattr(cmd, "schema_name"):
return cmd.schema_name()
if isinstance(cmd, dict):
return cmd.get("OTIO_SCHEMA", "")
return ""
#: Session has not yet started.
STATE_NONE = "NONE"
#: Broadcasting ``WHO_IS_MASTER``; waiting for a response.
STATE_DISCOVERING = "DISCOVERING"
#: Master found; waiting for a full state snapshot.
STATE_JOINING = "JOINING"
#: Snapshot received and applied; fully participating in the session.
STATE_SYNCED = "SYNCED"
[docs]
class SyncManager:
"""Coordinates OTIO object synchronisation across a network session.
The manager maintains two complementary data structures:
* ``_object_map`` — a flat ``{guid: otio_object}`` index for O(1) lookup by GUID.
* ``_timelines`` — a ``{guid: Timeline}`` map of every registered top-level timeline.
All mutations (inserts, removals, property changes) are applied locally **and**
broadcast to peers via the injected *network* backend. Incoming messages are
applied through :meth:`apply_patch`, which also fires registered observer callbacks
so that the host application (e.g. the RV plugin) can react to remote changes.
**Session lifecycle**
1. Call :meth:`start_session` — status transitions to ``STATE_DISCOVERING``.
2. The caller polls :meth:`receive_and_apply_all` until a ``master_found`` action
is returned, then calls :meth:`request_state`.
3. Status transitions to ``STATE_JOINING``; incoming non-session messages are
buffered in ``_delta_buffer``.
4. When a ``state_snapshot_received`` action is returned, the caller invokes
:meth:`apply_snapshot` which applies the full state and replays buffered deltas
before transitioning to ``STATE_SYNCED``.
If no master responds within the discovery timeout (implemented in the caller),
the caller elects itself master and calls :meth:`broadcast_master_response`.
:param session_id: Logical session identifier; scopes all network messages.
:param self_guid: Stable GUID for this peer; auto-generated when not provided.
:param network: Network backend satisfying :class:`~otio_sync_core.network.SyncNetworkProtocol`.
May be set or replaced after construction.
"""
def __init__(
self,
session_id: str = "default_session",
self_guid: str | None = None,
network: SyncNetworkProtocol | None = None,
) -> None:
self.session_id = session_id
self.self_guid: str = self_guid or str(uuid.uuid4())
self.network: SyncNetworkProtocol | None = network
self.patcher = OTIOPatcher()
self._timelines: dict[str, otio.schema.Timeline] = {}
#: Maps seq_clip_guid → clip_timeline_guid for all single-clip timelines.
self._clip_timelines: dict[str, str] = {}
self.active_timeline_guid: str | None = None
self._status_callbacks: list[Callable[[str], None]] = []
self._playback_callbacks: list[Callable[[dict[str, Any]], None]] = []
self._display_callbacks: list[Callable[[dict[str, Any]], None]] = []
self._synced_callbacks: list[Callable[[], None]] = []
# Register internal callback to broadcast property changes
@self.patcher.on_property_changed
def _on_local_property_changed(target_uuid: str, path: str, value: Any) -> None:
if not self._is_syncing and self.network:
self._send_message(
SetProperty(
target_uuid=target_uuid,
path=path,
value=value,
sync_timestamp=time.time(),
)
)
self.status: str = STATE_NONE
self.is_master: bool = False
self.master_guid: str | None = None
self._delta_buffer: list[dict[str, Any]] = []
self._last_snapshot_time: float = 0
self._last_who_is_master_time: float | None = None
self._state_request_time: float | None = None
#: Last received playback state dict; empty until the first playback message.
self.playback_state: dict[str, Any] = {}
#: Last received display state dict; empty until the first display message.
#: Keys: ``pan`` ([x, y] normalised), ``zoom`` (float), ``exposure`` (stops),
#: ``channel`` (``"RGBA"``, ``"R"``, ``"G"``, ``"B"``, or ``"A"``).
self.display_state: dict[str, Any] = {}
#: GUID of the clip most recently selected by a remote peer via a
#: ``SELECTION`` broadcast. ``None`` when the selection is cleared.
self.selected_clip_guid: str | None = None
#: Receive-side dispatch table: ``(command_schema, event)`` -> handler.
#: Each handler takes ``(msg, data, source)`` and returns an
#: ``(action, data)`` tuple or ``None``. All OTIO_SESSION events route
#: to a single handler that delegates to the patcher.
self._handlers: dict[
tuple[str, str],
Callable[[ProtocolMessage, dict[str, Any], str], "tuple[str, Any] | None"],
] = {
("LiveSession.1", "WHO_IS_MASTER"): self._h_who_is_master,
("LiveSession.1", "I_AM_MASTER"): self._h_i_am_master,
("LiveSession.1", "STATE_REQUEST"): self._h_state_request,
("LiveSession.1", "STATE_SNAPSHOT"): self._h_state_snapshot,
("PLAYBACK_SETTINGS_1.0", "SET"): self._h_playback_set,
("DISPLAY_SETTINGS_1.0", "SET"): self._h_display_set,
("SELECTION_1.0", "SET"): self._h_selection_set,
("TIMELINE_1.0", "ADD_TIMELINE"): self._h_add_timeline,
("TIMELINE_1.0", "RENAME_TIMELINE"): self._h_rename_timeline,
("Annotation.1", "PARTIAL"): self._h_partial_annotation,
("OTIO_SESSION_1.0", "SET_PROPERTY"): self._h_otio_session,
("OTIO_SESSION_1.0", "INSERT_CHILD"): self._h_otio_session,
("OTIO_SESSION_1.0", "MOVE_CHILD"): self._h_otio_session,
("OTIO_SESSION_1.0", "REMOVE_CHILD"): self._h_otio_session,
("OTIO_SESSION_1.0", "REPLACE_ANNOTATION_COMMANDS"): self._h_otio_session,
}
# ------------------------------------------------------------------
# Properties
# ------------------------------------------------------------------
@property
def _object_map(self) -> dict[str, otio.core.SerializableObject]:
return self.patcher.object_map
@_object_map.setter
def _object_map(self, val: dict[str, otio.core.SerializableObject]) -> None:
self.patcher.object_map = val
@property
def _is_syncing(self) -> bool:
return self.patcher._is_syncing
@_is_syncing.setter
def _is_syncing(self, val: bool) -> None:
self.patcher._is_syncing = val
@property
def _property_callbacks(self) -> list[Callable[[str, str, Any], None]]:
return self.patcher._property_callbacks
@property
def _hierarchy_callbacks(self) -> list[Callable[[str, str, str], None]]:
return self.patcher._hierarchy_callbacks
@property
def is_syncing(self) -> bool:
"""``True`` while a snapshot or incoming delta is being applied locally.
Callers can read this to suppress outgoing broadcasts that would echo
changes back to their source.
:rtype: bool
"""
return self._is_syncing
@property
def root_timeline(self) -> otio.schema.Timeline | None:
"""The active timeline, or the first registered timeline when none is active.
:returns: Active :class:`~opentimelineio.schema.Timeline`, or ``None`` if no
timelines have been registered.
"""
if self.active_timeline_guid:
tl = self._timelines.get(self.active_timeline_guid)
if tl is not None:
return tl
return next(iter(self._timelines.values()), None)
@property
def timelines(self) -> dict[str, otio.schema.Timeline]:
"""Read-only view of all registered timelines, keyed by sync GUID."""
return self._timelines
@property
def object_map(self) -> dict[str, otio.core.SerializableObject]:
"""Read-only view of the flat GUID → OTIO object index."""
return self._object_map
@property
def active_clip_guid(self) -> "str | None":
"""Sequence clip GUID if the active timeline is a single-clip timeline, else ``None``.
:rtype: str or None
"""
if not self.active_timeline_guid:
return None
for clip_guid, tl_guid in self._clip_timelines.items():
if tl_guid == self.active_timeline_guid:
return clip_guid
return None
@property
def sequence_timeline_guid(self) -> "str | None":
"""GUID of the first registered timeline that is *not* a clip timeline.
:rtype: str or None
"""
clip_tl_guids = set(self._clip_timelines.values())
for guid in self._timelines:
if guid not in clip_tl_guids:
return guid
return None
# ------------------------------------------------------------------
# Timeline Registration
# ------------------------------------------------------------------
[docs]
def register_timeline(self, timeline: otio.schema.Timeline) -> OTIOSyncProxy:
"""Register a timeline, assign GUIDs to all its objects, and index them.
Sets :attr:`active_timeline_guid` to the new timeline's GUID if no active
timeline exists yet.
:param timeline: The :class:`~opentimelineio.schema.Timeline` to register.
:returns: An :class:`~otio_sync_core.proxy.OTIOSyncProxy` wrapping *timeline*
so that attribute writes are automatically broadcast.
"""
self._ensure_guid_and_map(timeline)
guid = timeline.metadata["sync"]["guid"]
self._timelines[guid] = timeline
self._traverse_and_map(timeline)
if self.active_timeline_guid is None:
self.active_timeline_guid = guid
return OTIOSyncProxy(timeline, self.patcher)
[docs]
def get_or_create_clip_timeline(self, clip_guid: str) -> "str | None":
"""Return the GUID of the single-clip timeline for *clip_guid*, creating it lazily.
All peers independently derive the **same** GUIDs via :meth:`_derive_guid`,
so no coordination message is required before clips can be used across
peers. Callers should broadcast the timeline via
:meth:`broadcast_clip_timeline` the first time it is created so that
peers without local creation can register the annotation track in their
``_object_map`` (required for receiving annotation ``INSERT_CHILD``
patches).
The clip copy inside the clip timeline shares the same sync GUID as the
sequence clip. :meth:`_traverse_and_map_preserve` ensures the sequence
clip remains canonical in ``_object_map`` so that
``range_in_parent()`` returns the sequence-level position.
:param clip_guid: Sync GUID of the target sequence clip.
:returns: GUID of the clip timeline, or ``None`` if *clip_guid* is not
a known :class:`~opentimelineio.schema.Clip`.
:rtype: str or None
"""
if clip_guid in self._clip_timelines:
return self._clip_timelines[clip_guid]
seq_clip = self._object_map.get(clip_guid)
if seq_clip is None or not isinstance(seq_clip, otio.schema.Clip):
_log(f"get_or_create_clip_timeline: clip {clip_guid} not in object_map or not a Clip")
return None
clip_tl_guid = self._derive_guid(f"clip_timeline:{clip_guid}")
video_track_guid = self._derive_guid(f"clip_timeline_video_track:{clip_guid}")
ann_track_guid = self._derive_guid(f"clip_timeline_ann_track:{clip_guid}")
# Deep-copy the clip preserving its sync GUID so annotations cross-reference.
clip_copy = _dict_to_otio(_otio_to_dict(seq_clip))
clip_copy.metadata.setdefault("sync", {})["guid"] = clip_guid
tl = otio.schema.Timeline(name=getattr(seq_clip, "name", None) or "clip")
tl.metadata["sync"] = {"guid": clip_tl_guid}
tl.metadata["clip_timeline_for"] = clip_guid
video_track = otio.schema.Track(
name="V1", kind=otio.schema.TrackKind.Video
)
video_track.metadata["sync"] = {"guid": video_track_guid}
video_track.append(clip_copy)
ann_track = otio.schema.Track(name="Annotations")
ann_track.metadata["sync"] = {"guid": ann_track_guid}
tl.tracks.append(video_track)
tl.tracks.append(ann_track)
self._timelines[clip_tl_guid] = tl
# Use preserve so the sequence clip stays canonical in _object_map.
self._traverse_and_map_preserve(tl)
self._clip_timelines[clip_guid] = clip_tl_guid
_log(
f"get_or_create_clip_timeline: created clip_tl={clip_tl_guid[:8]} "
f"for clip={clip_guid[:8]}"
)
return clip_tl_guid
[docs]
def broadcast_add_timeline(self, tl_guid: str) -> None:
"""Broadcast a timeline to all peers so they can register it.
Works for both sequence timelines (new playlist / new sequence) and
single-clip annotation timelines. Call once immediately after
:meth:`register_timeline` to propagate a locally-created timeline to
all connected peers. Peers that already hold the same GUID silently
ignore the message.
:param tl_guid: GUID of the timeline to broadcast.
"""
if not self.network or self.status != STATE_SYNCED:
return
tl = self._timelines.get(tl_guid)
if tl is None:
return
self._send_message(
AddTimeline(
timeline_guid=tl_guid,
timeline=tl,
sync_timestamp=time.time(),
)
)
[docs]
def broadcast_clip_timeline(self, tl_guid: str) -> None:
"""Broadcast a clip timeline to all peers so they can register its annotation track.
Should be called once per clip timeline, immediately after
:meth:`get_or_create_clip_timeline` returns a new GUID. Peers that
already have the timeline (same deterministic GUID) will skip the
``ADD_TIMELINE`` message.
Delegates to :meth:`broadcast_add_timeline`.
:param tl_guid: GUID of the clip timeline to broadcast.
"""
self.broadcast_add_timeline(tl_guid)
[docs]
def broadcast_timeline_rename(self, tl_guid: str, new_name: str) -> None:
"""Rename a timeline locally and broadcast the change to all peers.
Updates the timeline's ``name`` attribute in ``_timelines`` immediately,
then sends a ``RENAME_TIMELINE`` message so all connected peers apply the
same rename.
:param tl_guid: GUID of the timeline to rename.
:param new_name: New display name for the timeline.
"""
if self._is_syncing or not self.network or self.status != STATE_SYNCED:
return
tl = self._timelines.get(tl_guid)
if tl is None:
_log(f"broadcast_timeline_rename: unknown timeline {tl_guid}")
return
tl.name = new_name
self._send_message(
RenameTimeline(
timeline_guid=tl_guid,
name=new_name,
sync_timestamp=time.time(),
)
)
[docs]
def reset_timelines(self) -> None:
"""Clear all registered timelines, the object map, and the active GUID.
Used during master re-initialisation when the timeline data must be
rebuilt from scratch (e.g. after the RV node graph settles).
"""
self._timelines.clear()
self._object_map.clear()
self._clip_timelines.clear()
self.active_timeline_guid = None
@staticmethod
def _derive_guid(key: str) -> str:
"""Return a stable, deterministic UUID derived from *key*.
Uses :func:`uuid.uuid5` so that all peers independently compute the
same GUID for the same logical object (e.g. the clip timeline for a
given sequence clip) without any coordination message.
:param key: Namespace string (e.g. ``"clip_timeline:<seq_clip_guid>"``).
:returns: UUID string.
:rtype: str
"""
return str(uuid.uuid5(uuid.NAMESPACE_OID, key))
def _traverse_and_map(self, item: otio.core.SerializableObject) -> None:
self.patcher.traverse_and_map(item)
def _traverse_and_map_preserve(self, item: otio.core.SerializableObject) -> None:
self.patcher.traverse_and_map_preserve(item)
def _ensure_guid_and_map(self, obj: Any) -> None:
self.patcher.ensure_guid_and_map(obj)
# ------------------------------------------------------------------
# Observer Registry
# ------------------------------------------------------------------
[docs]
def on_property_changed(
self, callback: Callable[[str, str, Any], None]
) -> Callable[[str, str, Any], None]:
"""Register a callback for property change events.
Fires for both locally-initiated and remotely-applied property changes.
May be used as a decorator.
:param callback: Callable receiving ``(target_uuid, path, new_value)``.
:returns: The *callback* unchanged (decorator-compatible).
"""
self.patcher.on_property_changed(callback)
return callback
[docs]
def on_hierarchy_changed(
self, callback: Callable[[str, str, str], None]
) -> Callable[[str, str, str], None]:
"""Register a callback for hierarchy change events.
Fires for both locally-initiated and remotely-applied structural changes.
May be used as a decorator.
:param callback: Callable receiving ``(parent_uuid, action, child_uuid)``
where *action* is one of ``"insert_child"`` or ``"remove_child"``.
:returns: The *callback* unchanged (decorator-compatible).
"""
self.patcher.on_hierarchy_changed(callback)
return callback
[docs]
def on_status_changed(
self, callback: Callable[[str], None]
) -> Callable[[str], None]:
"""Register a callback fired whenever :attr:`status` transitions.
:param callback: Callable receiving the new status string.
:returns: The *callback* unchanged (decorator-compatible).
"""
self._status_callbacks.append(callback)
return callback
[docs]
def on_playback_changed(
self, callback: Callable[[dict[str, Any]], None]
) -> Callable[[dict[str, Any]], None]:
"""Register a callback fired whenever a playback-state message arrives.
The callback receives the raw playback state dict (same structure as
:attr:`playback_state`). Also usable as a decorator.
:param callback: Callable receiving the playback state dict.
:returns: The *callback* unchanged (decorator-compatible).
"""
self._playback_callbacks.append(callback)
return callback
[docs]
def on_display_changed(
self, callback: Callable[[dict[str, Any]], None]
) -> Callable[[dict[str, Any]], None]:
"""Register a callback fired whenever a display-state message arrives.
The callback receives the raw display state dict (same structure as
:attr:`display_state`). Also usable as a decorator.
:param callback: Callable receiving the display state dict.
:returns: The *callback* unchanged (decorator-compatible).
"""
self._display_callbacks.append(callback)
return callback
[docs]
def on_synced(
self, callback: Callable[[], None]
) -> Callable[[], None]:
"""Register a callback fired once when the session reaches ``STATE_SYNCED``.
Fires both when this peer self-elects as master and when it finishes
joining an existing master. Also usable as a decorator.
:param callback: Zero-argument callable.
:returns: The *callback* unchanged (decorator-compatible).
"""
self._synced_callbacks.append(callback)
return callback
def _set_status(self, new_status: str) -> None:
"""Update :attr:`status` and fire registered status-change callbacks."""
if new_status == self.status:
return
self.status = new_status
for cb in self._status_callbacks:
try:
cb(new_status)
except Exception as e:
_log(f"on_status_changed callback error: {e}")
if new_status == STATE_SYNCED:
for cb in self._synced_callbacks:
try:
cb()
except Exception as e:
_log(f"on_synced callback error: {e}")
def _fire_property_changed(
self, target_uuid: str, path: str, value: Any
) -> None:
for cb in self._property_callbacks:
try:
cb(target_uuid, path, value)
except Exception as e:
_log(f"on_property_changed callback error: {e}")
def _fire_hierarchy_changed(
self, parent_uuid: str, action: str, child_uuid: str
) -> None:
for cb in self._hierarchy_callbacks:
try:
cb(parent_uuid, action, child_uuid)
except Exception as e:
_log(f"on_hierarchy_changed callback error: {e}")
# ------------------------------------------------------------------
# Master Election & Session State
# ------------------------------------------------------------------
[docs]
def start_session(self) -> None:
"""Begin the join process by broadcasting a master-discovery message.
Transitions :attr:`status` to ``STATE_DISCOVERING``. The caller is
responsible for timing out and calling the appropriate method if no master
responds (see class docstring for the full lifecycle).
"""
self._set_status(STATE_DISCOVERING)
self.broadcast_master_discovery()
[docs]
def broadcast_master_discovery(self) -> None:
"""Broadcast a ``WHO_IS_MASTER`` session message."""
self._send_message(WhoIsMaster(requester_guid=self.self_guid))
[docs]
def broadcast_master_response(self) -> None:
"""Broadcast an ``I_AM_MASTER`` session message.
Called after self-election (discovery timeout) or when an existing master
receives a ``WHO_IS_MASTER`` it should answer.
"""
self._send_message(IAmMaster(master_guid=self.self_guid))
[docs]
def request_state(self) -> None:
"""Send a ``STATE_REQUEST`` to the master and enter ``STATE_JOINING``.
Non-session messages received while joining are buffered in
``_delta_buffer`` and replayed by :meth:`apply_snapshot`.
"""
if self.master_guid:
self._set_status(STATE_JOINING)
self._state_request_time = time.time()
self._send_message(StateRequest(
target_guid=self.master_guid,
requester_guid=self.self_guid,
))
[docs]
def send_state_snapshot(
self,
target_guid: str,
playback_state: dict[str, Any] | None = None,
) -> None:
"""Serialise all registered timelines and send a full snapshot to a joiner.
Only the master should call this method. The snapshot is broadcast to the
whole session (not unicast), but only the peer whose GUID matches *target_guid*
will act on it.
:param target_guid: GUID of the requesting peer.
:param playback_state: Optional current playback state dict to include so the
joiner can immediately seek to the right position.
"""
if not self.is_master or not self._timelines:
return
self._send_message(StateSnapshot(
target_guid=target_guid,
timelines=dict(self._timelines),
active_timeline_guid=self.active_timeline_guid,
snapshot_timestamp=time.time(),
playback_state=playback_state or None,
display_state=self.display_state or None,
))
def _send_message(self, msg: ProtocolMessage) -> None:
"""Wrap a typed :class:`ProtocolMessage` in the envelope and send it.
The envelope's ``command_schema``, ``command.event`` and
``command.payload`` are derived from the message class, so the message
definition is the single source of truth for the wire format.
:param msg: A registered :class:`ProtocolMessage` instance.
"""
if not self.network:
return
envelope: dict[str, Any] = {
"session": self.session_id,
"source_guid": self.self_guid,
"payload": {
"command_schema": msg.SCHEMA,
"command": {
"event": msg.EVENT,
"payload": msg.to_payload(),
}
}
}
if msg.ENVELOPE_SCHEMA is not None:
envelope["schema"] = msg.ENVELOPE_SCHEMA
self.network.send_payload(envelope)
# ------------------------------------------------------------------
# Data Mutations
# ------------------------------------------------------------------
[docs]
def set_property(self, target_uuid: str, path: str, value: Any) -> None:
"""Set property *path* to *value* on object *target_uuid* and broadcast.
Property paths are either plain attributes (e.g. ``"name"``) or metadata
sub-paths starting with ``"metadata/"`` (e.g. ``"metadata/annotations"``).
:param target_uuid: GUID of the target object.
:param path: Target property or metadata sub-key path.
:param value: New value; must be a primitive type.
"""
self.patcher.set_property(target_uuid, path, value)
[docs]
def insert_child(
self,
parent_uuid: str,
child_obj: otio.core.SerializableObject,
index: int = -1,
) -> None:
"""Insert *child_obj* into the parent container and broadcast the change.
A GUID is assigned to *child_obj* if it does not already have one.
Use ``index=-1`` to append.
:param parent_uuid: GUID of the parent container (Track or Stack).
:param child_obj: OTIO object to insert.
:param index: Position at which to insert; ``-1`` appends.
"""
msg = self.patcher.insert_child(parent_uuid, child_obj, index)
if not self._is_syncing and self.network and msg:
_log(
f"insert_child broadcasting: parent={parent_uuid} index={index} "
f"child={getattr(child_obj, 'name', '?')}"
)
self._send_message(msg)
[docs]
def broadcast_playback_state(
self,
state_dict: dict[str, Any],
timeline_guid: str | None = None,
) -> None:
"""Broadcast the current playback state to all peers.
:param state_dict: Playback state fields (``playing``, ``current_time``,
``looping``, etc.) as defined by the protocol.
:param timeline_guid: GUID of the timeline being played; falls back to
:attr:`active_timeline_guid`.
"""
if self._is_syncing or not self.network:
return
inner = dict(state_dict)
inner["sync_timestamp"] = time.time()
inner["timeline_guid"] = timeline_guid or self.active_timeline_guid
self._send_message(PlaybackSettingsSet.from_payload(inner))
[docs]
def broadcast_display_state(self, state_dict: dict[str, Any]) -> None:
"""Broadcast the current display state to all peers and persist it.
Expected keys in *state_dict*:
* ``pan`` — ``[x, y]`` normalised pan offset.
* ``zoom`` — zoom multiplier (``1.0`` = no zoom).
* ``exposure`` — exposure adjustment in stops (``0.0`` = no change).
* ``channel`` — active channel string: ``"RGBA"``, ``"R"``, ``"G"``,
``"B"``, or ``"A"``.
The state is also written into the active timeline's
``metadata["display_settings"]`` so it survives a full session teardown
if the OTIO file is saved to disk.
:param state_dict: Display state fields as listed above.
"""
if self._is_syncing or not self.network:
return
inner = dict(state_dict)
inner["sync_timestamp"] = time.time()
self.display_state = inner
tl = self.root_timeline
if tl is not None:
tl.metadata["display_settings"] = {
k: v for k, v in inner.items() if k != "sync_timestamp"
}
self._send_message(DisplaySettingsSet.from_payload(inner))
@staticmethod
def _annotation_track_end(track: otio.schema.Track) -> int:
"""Return the total duration (in frames) of all children in *track*.
This is the track position at which the next appended child would start,
analogous to ``lastframe`` in ``ORIAnnotations._export_otio_media``.
:param track: An OTIO :class:`~opentimelineio.schema.Track`.
:returns: Sum of ``source_range.duration.value`` for all children.
:rtype: int
"""
total = 0
for child in track:
sr = getattr(child, "source_range", None)
if sr is not None:
total += int(sr.duration.value)
return total
@staticmethod
def _find_annotation_clip_at(
track: otio.schema.Track,
clip_guid: str,
frame: int,
) -> "otio.schema.Clip | None":
"""Find an existing annotation clip for *(clip_guid, frame)* in *track*.
:param track: The Annotations :class:`~opentimelineio.schema.Track`.
:param clip_guid: GUID of the media clip being annotated.
:param frame: 0-indexed clip-local frame number.
:returns: The matching :class:`~opentimelineio.schema.Clip`, or ``None``.
"""
for child in track:
if not isinstance(child, otio.schema.Clip):
continue
if child.metadata.get("clip_guid") != clip_guid:
continue
sr = getattr(child, "source_range", None)
if sr is not None and int(sr.start_time.value) == frame:
return child
return None
@staticmethod
def _try_merge_annotation(
parent: otio.schema.Track,
child_obj: otio.core.SerializableObject,
) -> "otio.schema.Clip | None":
"""Check whether *child_obj* is an annotation-merge delta and apply it.
If *parent* already contains a clip for the same ``(clip_guid, frame)``
as *child_obj*, the incoming ``annotation_commands`` are appended to
that existing clip and the existing clip is returned (so the caller can
raise an ``annotation_commands_added`` event without inserting a
structural duplicate). Returns ``None`` when no merge applies.
:param parent: The parent track that would receive *child_obj*.
:param child_obj: The incoming OTIO object from an ``INSERT_CHILD`` message.
:returns: The existing clip if a merge occurred, otherwise ``None``.
"""
if not isinstance(parent, otio.schema.Track):
return None
if not hasattr(child_obj, "metadata"):
return None
incoming_cmds = child_obj.metadata.get("annotation_commands")
incoming_cg = child_obj.metadata.get("clip_guid")
incoming_sr = getattr(child_obj, "source_range", None)
if not incoming_cmds or not incoming_cg or incoming_sr is None:
return None
incoming_frame = int(incoming_sr.start_time.value)
existing = SyncManager._find_annotation_clip_at(
parent, incoming_cg, incoming_frame
)
if existing is None:
return None
existing.metadata["annotation_commands"].extend(incoming_cmds)
return existing
@staticmethod
def _make_annotation_clip(
clip_guid: str,
clip_local_time: otio.opentime.RationalTime,
otio_events: list,
) -> otio.schema.Clip:
"""Build a 1-frame annotation clip for *clip_guid* at *clip_local_time*.
:param clip_guid: GUID of the media clip being annotated.
:param clip_local_time: 0-indexed time within the clip source range.
:param otio_events: Deserialised SyncEvent objects to embed.
:returns: A new :class:`~opentimelineio.schema.Clip`.
"""
frame = int(clip_local_time.value)
fps = clip_local_time.rate
clip = otio.schema.Clip(name=f"Annotation_{frame}")
clip.source_range = otio.opentime.TimeRange(
clip_local_time,
otio.opentime.RationalTime(1, fps),
)
clip.metadata["annotation_commands"] = otio_events
clip.metadata["clip_guid"] = clip_guid
return clip
[docs]
def annotation_track_guid_for_clip(
self,
clip_guid: str,
preferred_timeline_guid: "str | None" = None,
) -> "str | None":
"""Return the GUID of the Annotations track in the same timeline as *clip_guid*.
Searches every non-annotation track for *clip_guid*, then returns the
first track whose name contains ``"annotation"`` (case-insensitive) from
that same timeline.
When *preferred_timeline_guid* is provided (e.g. the current
:attr:`active_timeline_guid`), that timeline is searched first. This
ensures that annotations are written to the clip timeline's annotation
track while in clip mode, rather than the sequence timeline's track.
:param clip_guid: Sync GUID of the media clip.
:param preferred_timeline_guid: GUID of the timeline to search first;
falls back to all timelines if not found there.
:returns: Annotation track GUID, or ``None`` if not found.
:rtype: str or None
"""
timelines = list(self._timelines.values())
if preferred_timeline_guid:
pref = self._timelines.get(preferred_timeline_guid)
if pref is not None:
timelines = [pref] + [t for t in timelines if t is not pref]
for timeline in timelines:
clip_found = False
for track in timeline.tracks:
if "annotation" in (track.name or "").lower():
continue
for item in track:
if item.metadata.get("sync", {}).get("guid") == clip_guid:
clip_found = True
break
if clip_found:
break
if not clip_found:
continue
for track in timeline.tracks:
if track.name and "annotation" in track.name.lower():
return track.metadata.get("sync", {}).get("guid")
return None
[docs]
def annotation_clip_guid_at(self, clip_guid: str, frame: int) -> "str | None":
"""Return the sync GUID of the annotation clip at *(clip_guid, frame)*.
Convenience wrapper around :meth:`annotation_track_guid_for_clip` and
:meth:`_find_annotation_clip_at` that returns the clip's own GUID
rather than the object itself.
:param clip_guid: GUID of the media clip being annotated.
:param frame: 0-indexed clip-local frame number.
:returns: Annotation clip GUID, or ``None`` if not found.
:rtype: str or None
"""
ann_track_guid = self.annotation_track_guid_for_clip(clip_guid)
if ann_track_guid is None:
return None
ann_track = self._object_map.get(ann_track_guid)
if ann_track is None:
return None
clip = self._find_annotation_clip_at(ann_track, clip_guid, frame)
if clip is None:
return None
return clip.metadata.get("sync", {}).get("guid")
[docs]
def count_annotation_commands(
self, clip_guid: str, frame: int
) -> "tuple[int, int]":
"""Return ``(n_strokes, n_captions)`` already committed for *(clip_guid, frame)*.
Counts ``PaintStart`` events (strokes) and ``TextAnnotation`` events
(captions) in the annotation track. Accumulates across all matching
clips at the same frame so that old snapshots containing per-stroke
clips are handled correctly.
:param clip_guid: GUID of the media clip being annotated.
:param frame: 0-indexed clip-local frame number.
:returns: ``(n_strokes, n_captions)`` already in the annotation track.
:rtype: tuple
"""
ann_track_guid = self.annotation_track_guid_for_clip(clip_guid)
if ann_track_guid is None:
return 0, 0
ann_track = self._object_map.get(ann_track_guid)
if ann_track is None:
return 0, 0
n_strokes = 0
n_captions = 0
for item in ann_track:
if not isinstance(item, otio.schema.Clip):
continue
if item.metadata.get("clip_guid") != clip_guid:
continue
sr = getattr(item, "source_range", None)
if sr is None or int(sr.start_time.value) != frame:
continue
for cmd in item.metadata.get("annotation_commands", []):
schema = sync_event_schema(cmd)
if schema.startswith("PaintStart"):
n_strokes += 1
elif schema.startswith("TextAnnotation"):
n_captions += 1
return n_strokes, n_captions
[docs]
def broadcast_add_annotation(
self,
annotation_track_guid: str,
clip_guid: str,
clip_local_time: otio.opentime.RationalTime,
events: list[dict[str, Any]],
) -> "str | None":
"""Build an annotation clip and insert it via the standard patch path.
Annotations are expressed as ``insert_child`` patches so that all peers
apply them through the same code path as any other timeline mutation.
The annotation track mirrors the structure produced by
:meth:`ORIAnnotations.ReviewItem._export_otio_media`: each annotated
frame is a 1-frame :class:`~opentimelineio.schema.Clip` and the gaps
between annotated frames are :class:`~opentimelineio.schema.Gap` objects
whose duration is ``frame − track_end`` frames. A second stroke on an
already-annotated frame merges its commands into the existing clip rather
than inserting a duplicate.
:param annotation_track_guid: GUID of the target Annotations track.
:param clip_guid: GUID of the media clip being annotated.
:param clip_local_time: 0-indexed time within the clip's source range.
:param events: Serialised OTIO SyncEvent dicts (``PaintStart.1``,
``PaintPoints.1``) as produced by ``otio.adapters.write_to_string``.
:returns: The sync GUID of the annotation clip that was created or
merged into, or ``None`` if the operation could not be completed.
:rtype: str or None
"""
if not self.network or self.status != STATE_SYNCED:
return
if annotation_track_guid not in self._object_map:
_log(f"broadcast_add_annotation: annotation track {annotation_track_guid} not found")
return
otio_events: list[otio.core.SerializableObject] = []
for e in events:
try:
otio_events.append(_dict_to_otio(e) if isinstance(e, dict) else e)
except Exception as exc:
_log(f"broadcast_add_annotation: failed to deserialise event: {exc}")
annotation_track = self._object_map[annotation_track_guid]
frame = int(clip_local_time.value)
fps = clip_local_time.rate
existing = self._find_annotation_clip_at(annotation_track, clip_guid, frame)
if existing is not None:
# A clip already exists at this frame — merge the new commands in locally
# and broadcast a delta clip so peers can apply the same merge.
existing.metadata["annotation_commands"].extend(otio_events)
delta_clip = self._make_annotation_clip(clip_guid, clip_local_time, otio_events)
self._ensure_guid_and_map(delta_clip)
self._send_message(InsertChild(
parent_uuid=annotation_track_guid,
index=-1,
child_data=delta_clip,
sync_timestamp=time.time(),
))
return existing.metadata.get("sync", {}).get("guid")
else:
# New frame — insert a Gap to reach it (if needed) then the clip.
track_end = self._annotation_track_end(annotation_track)
if frame > track_end:
gap = otio.schema.Gap(
source_range=otio.opentime.TimeRange(
start_time=otio.opentime.RationalTime(track_end, fps),
duration=otio.opentime.RationalTime(frame - track_end, fps),
)
)
self.insert_child(annotation_track_guid, gap)
ann_clip = self._make_annotation_clip(clip_guid, clip_local_time, otio_events)
self.insert_child(annotation_track_guid, ann_clip)
return ann_clip.metadata.get("sync", {}).get("guid")
[docs]
def broadcast_partial_annotation(
self,
clip_guid: str,
frame: float,
fps: float,
events: list,
) -> None:
"""Broadcast a mid-stroke partial annotation to peers (visual only, no timeline persistence).
Called periodically while the user is drawing a stroke, before pen-up.
Peers render the stroke visually but do **not** write it to the OTIO
timeline — that happens on pen-up via :meth:`broadcast_add_annotation`.
:param clip_guid: Sync GUID of the media clip being annotated.
:param frame: 0-indexed clip-local frame number.
:param fps: Frame rate used to interpret *frame*.
:param events: Serialised SyncEvent dicts (``PaintStart.1``, ``PaintPoints.1``).
"""
if not self.network or self.status != STATE_SYNCED:
return
self._send_message(PartialAnnotation(
clip_guid=clip_guid,
frame=frame,
fps=fps,
events=[_otio_to_dict(e) if not isinstance(e, dict) else e for e in events],
))
[docs]
def broadcast_replace_annotation_commands(
self,
annotation_clip_guid: str,
events: list,
) -> None:
"""Replace all annotation_commands on an existing clip and broadcast to peers.
Used when the user edits text in an annotation in place — the command
count stays the same but the text content changes. Sends a
``REPLACE_ANNOTATION_COMMANDS`` message so peers replace the full
command list rather than appending a delta.
:param annotation_clip_guid: Sync GUID of the annotation clip to update.
:param events: Full replacement list of SyncEvent objects (strokes +
captions) representing the current annotation state.
"""
if not self.network or self.status != STATE_SYNCED:
return
clip = self._object_map.get(annotation_clip_guid)
if clip is None:
_log(f"broadcast_replace_annotation_commands: clip {annotation_clip_guid} not found")
return
otio_events: list[otio.core.SerializableObject] = []
for e in events:
try:
otio_events.append(_dict_to_otio(e) if isinstance(e, dict) else e)
except Exception as exc:
_log(f"broadcast_replace_annotation_commands: failed to deserialise event: {exc}")
clip.metadata["annotation_commands"] = otio_events
self._send_message(ReplaceAnnotationCommands(
annotation_clip_guid=annotation_clip_guid,
commands=list(otio_events),
sync_timestamp=time.time(),
))
[docs]
def broadcast_selection(self, clip_guid: str, view_mode: str = "source") -> None:
"""Broadcast the selected clip GUID to all peers.
:param clip_guid: OTIO sync GUID of the selected clip. Receivers
map this back to their local representation (RV source group,
xStudio playlist position, etc.) before applying.
:param view_mode: View mode string ("source" or "sequence").
:type view_mode: str
"""
if self._is_syncing or not self.network or self.status != STATE_SYNCED:
return
self._send_message(SelectionSet(
clip_guid=clip_guid, view_mode=view_mode, sync_timestamp=time.time(),
))
[docs]
def broadcast_move_child(
self, parent_uuid: str, child_uuid: str, to_index: int
) -> None:
"""Move *child_uuid* to *to_index* within its parent and broadcast the change.
Applies the reorder locally before broadcasting so the local OTIO model
stays consistent regardless of network round-trip time.
:param parent_uuid: GUID of the parent container.
:param child_uuid: GUID of the child to move.
:param to_index: Target position in the parent's child list.
"""
if self._is_syncing:
_log("broadcast_move_child: skipped (_is_syncing)")
return
if not self.network:
_log("broadcast_move_child: skipped (no network)")
return
if self.status != STATE_SYNCED:
_log(f"broadcast_move_child: skipped (status={self.status})")
return
msg = self.patcher.move_child(parent_uuid, child_uuid, to_index)
if msg:
self._send_message(msg)
[docs]
def broadcast_remove_child(self, parent_uuid: str, child_uuid: str) -> None:
"""Remove *child_uuid* from its parent and broadcast the change.
The child is removed from both the parent container and ``_object_map``.
:param parent_uuid: GUID of the parent container.
:param child_uuid: GUID of the child to remove.
"""
if self._is_syncing or not self.network or self.status != STATE_SYNCED:
return
msg = self.patcher.remove_child(parent_uuid, child_uuid)
if msg:
_log(f"broadcast_remove_child: removed {child_uuid} from {parent_uuid}")
self._send_message(msg)
# ------------------------------------------------------------------
# Message Handling
# ------------------------------------------------------------------
[docs]
def apply_patch(self, payload: dict[str, Any]) -> tuple[str, Any] | None:
"""Apply a single incoming message from the network.
Dispatches on ``command_schema`` and ``event`` fields. Returns an
``(action, data)`` tuple when the caller needs to act (e.g. to update RV
state), or ``None`` when the message was fully handled internally.
Messages from :attr:`self_guid` are silently discarded. Messages arriving
during ``STATE_JOINING`` are buffered (except session messages) and replayed
by :meth:`apply_snapshot`.
:param payload: Parsed message envelope received from the network.
:returns: ``(action_name, action_data)`` or ``None``.
"""
source = payload.get("source_guid", "unknown")
if source == self.self_guid:
return None
inner_payload = payload.get("payload", {})
command_schema = inner_payload.get("command_schema")
command_block = inner_payload.get("command", {})
event = command_block.get("event")
data = command_block.get("payload", {})
_log(f"apply_patch: command_schema={command_schema} event={event} source={source[:8]}")
if self.status == STATE_JOINING and command_schema != "LiveSession.1":
self._delta_buffer.append(payload)
return None
handler = self._handlers.get((command_schema, event))
msg_cls = message_for(command_schema, event)
if handler is None or msg_cls is None:
# Unknown (command_schema, event) — ignore safely and keep going.
return None
self._is_syncing = True
try:
msg = msg_cls.from_payload(data)
return handler(msg, data, source)
finally:
self._is_syncing = False
# ------------------------------------------------------------------
# Receive-side handlers (registered in ``self._handlers``)
# ------------------------------------------------------------------
def _h_who_is_master(
self, msg: WhoIsMaster, data: dict[str, Any], source: str
) -> "tuple[str, Any] | None":
if self.is_master:
self.broadcast_master_response()
elif self.status == STATE_SYNCED:
self._last_who_is_master_time = time.time()
return None
def _h_i_am_master(
self, msg: IAmMaster, data: dict[str, Any], source: str
) -> "tuple[str, Any] | None":
self.master_guid = msg.master_guid
self._last_who_is_master_time = None
if self.status == STATE_DISCOVERING:
return ("master_found", self.master_guid)
return None
def _h_state_request(
self, msg: StateRequest, data: dict[str, Any], source: str
) -> "tuple[str, Any] | None":
if not self.is_master:
return None
requester = msg.requester_guid or source
return ("state_request_received", requester)
def _h_state_snapshot(
self, msg: StateSnapshot, data: dict[str, Any], source: str
) -> "tuple[str, Any] | None":
if msg.target_guid == self.self_guid:
return ("state_snapshot_received", data)
return None
def _h_playback_set(
self, msg: PlaybackSettingsSet, data: dict[str, Any], source: str
) -> "tuple[str, Any] | None":
self.playback_state = data
# Sync active_timeline_guid so passive peers (e.g. the sync viewer)
# automatically follow the master when it switches between sequences.
# Skip clip-level timelines: those are single-clip artefacts that live
# alongside the sequence timeline and should not shadow the sequence
# view on passive peers.
tl_guid = msg.timeline_guid
if (tl_guid
and tl_guid in self._timelines
and tl_guid not in self._clip_timelines.values()):
self.active_timeline_guid = tl_guid
for cb in self._playback_callbacks:
try:
cb(data)
except Exception as e:
_log(f"on_playback_changed callback error: {e}")
return ("playback_settings", data)
def _h_display_set(
self, msg: DisplaySettingsSet, data: dict[str, Any], source: str
) -> "tuple[str, Any] | None":
self.display_state = data
tl = self.root_timeline
if tl is not None:
tl.metadata["display_settings"] = {
k: v for k, v in data.items() if k != "sync_timestamp"
}
for cb in self._display_callbacks:
try:
cb(data)
except Exception as e:
_log(f"on_display_changed callback error: {e}")
return ("display_settings", data)
def _h_selection_set(
self, msg: SelectionSet, data: dict[str, Any], source: str
) -> "tuple[str, Any] | None":
# Track the clip the master has selected so the sync viewer can
# highlight it even when scrubbing is paused.
self.selected_clip_guid = msg.clip_guid or None
return ("selection_changed", data)
def _h_add_timeline(
self, msg: AddTimeline, data: dict[str, Any], source: str
) -> "tuple[str, Any] | None":
tl_guid = msg.timeline_guid
# Check the GUID guard *before* deserializing: a timeline we already
# hold must not pay the as_otio() cost.
if tl_guid and msg.timeline and tl_guid not in self._timelines:
tl = msg.as_otio()
self._timelines[tl_guid] = tl
seq_clip_guid = tl.metadata.get("clip_timeline_for")
if seq_clip_guid:
# Single-clip annotation timeline — preserve canonical
# sequence clip in object_map.
self._traverse_and_map_preserve(tl)
self._clip_timelines[seq_clip_guid] = tl_guid
_log(
f"ADD_TIMELINE: registered clip_tl={tl_guid[:8]} "
f"for seq_clip={str(seq_clip_guid)[:8]}"
)
else:
# Full sequence / playlist timeline — traverse normally and
# notify the host application so it can create the corresponding
# viewer containers.
self._traverse_and_map(tl)
_log(
f"ADD_TIMELINE: new sequence timeline={tl_guid[:8]}"
f" name={tl.name!r}"
)
return ("add_timeline", tl)
return None
def _h_rename_timeline(
self, msg: RenameTimeline, data: dict[str, Any], source: str
) -> "tuple[str, Any] | None":
tl_guid = msg.timeline_guid
new_name = msg.name
tl = self._timelines.get(tl_guid)
if tl is not None and new_name:
tl.name = new_name
_log(f"RENAME_TIMELINE: {tl_guid[:8]} → {new_name!r}")
return ("timeline_renamed", data)
def _h_partial_annotation(
self, msg: PartialAnnotation, data: dict[str, Any], source: str
) -> "tuple[str, Any] | None":
return ("partial_annotation", data)
def _h_otio_session(
self, msg: ProtocolMessage, data: dict[str, Any], source: str
) -> "tuple[str, Any] | None":
return self.patcher.apply_patch(msg)
[docs]
def tick(self) -> list[tuple[str, Any]]:
"""Poll the network and auto-advance the session handshake.
This is the recommended entry point for new client integrations.
It wraps :meth:`receive_and_apply_all` and handles the session
state machine automatically:
* ``master_found`` → calls :meth:`request_state` internally.
* ``state_snapshot_received`` → calls :meth:`apply_snapshot` internally.
* ``state_request_received`` → **returned to caller**; the master must
respond by calling :meth:`send_state_snapshot`.
Application-level events (``playback_settings``, ``selection_changed``,
``annotation_*``, ``insert_child``, …) are returned so the caller can
react to them. Playback updates are also delivered through the
:meth:`on_playback_changed` callback if one is registered.
Compare with :meth:`receive_and_apply_all`, which returns every raw
action tuple and leaves the handshake entirely to the caller.
:returns: List of ``(action, data)`` tuples requiring application
action (subset of what :meth:`receive_and_apply_all` would return).
"""
app_events: list[tuple[str, Any]] = []
for action, data in self.receive_and_apply_all():
if action == "master_found":
self.request_state()
elif action == "state_snapshot_received":
# Replay results (buffered deltas newer than the snapshot) are
# forwarded so callers react to them just like live events.
replay = self.apply_snapshot(data)
if "playback_state" in data:
self.playback_state = data["playback_state"]
if "display_state" in data:
self.display_state = data["display_state"]
app_events.append(("display_settings", self.display_state))
app_events.extend(replay)
else:
app_events.append((action, data))
# Check for master failover
if (not self.is_master
and self.status == STATE_SYNCED
and getattr(self, "_last_who_is_master_time", None) is not None):
if time.time() - self._last_who_is_master_time > 2.0:
_log("Master did not respond to WHO_IS_MASTER. Promoting self to master.")
self.is_master = True
self.master_guid = self.self_guid
self._last_who_is_master_time = None
self.broadcast_master_response()
# Check for state snapshot timeout
if (self.status == STATE_JOINING
and getattr(self, "_state_request_time", None) is not None):
if time.time() - self._state_request_time > 5.0:
_log("STATE_REQUEST timed out. Reverting to DISCOVERING.")
self.master_guid = None
self._state_request_time = None
self._set_status(STATE_DISCOVERING)
app_events.append(("state_request_timeout", None))
return app_events
[docs]
def receive_and_apply_all(self) -> list[tuple[str, Any]]:
"""Drain the network and apply every pending message.
:returns: List of ``(action, data)`` tuples for messages that require a
response from the caller (e.g. to update RV state). Empty when all
messages were handled internally or no messages were waiting.
"""
if not self.network:
return []
results: list[tuple[str, Any]] = []
for p in self.network.receive_payloads():
res = self.apply_patch(p)
if res:
results.append(res)
return results
[docs]
def apply_snapshot(self, snapshot_data: dict[str, Any]) -> list[tuple[str, Any]]:
"""Replace local state with a full snapshot and replay buffered deltas.
Clears ``_object_map`` and ``_timelines``, deserialises the timelines from
*snapshot_data*, then replays any buffered messages whose ``sync_timestamp``
is newer than the snapshot. Transitions :attr:`status` to ``STATE_SYNCED``.
:param snapshot_data: ``payload`` dict from a ``STATE_SNAPSHOT`` message.
:returns: List of ``(action, data)`` tuples produced by replaying buffered
deltas; to be handled by the caller in the same way as the return value
of :meth:`receive_and_apply_all`.
"""
timestamp: float = snapshot_data.get("snapshot_timestamp", 0)
self._is_syncing = True
try:
self._timelines = {}
self._object_map = {}
self._clip_timelines = {}
# Sort so sequence timelines are processed before clip timelines.
# This guarantees the sequence clip is canonical in _object_map
# before the clip-timeline copy is registered via setdefault.
tl_items = sorted(
snapshot_data.get("timelines", {}).items(),
key=lambda kv: bool(kv[1].get("metadata", {}).get("clip_timeline_for")),
)
for guid, tl_dict in tl_items:
tl = _dict_to_otio(tl_dict)
self._timelines[guid] = tl
is_clip_tl = bool(tl.metadata.get("clip_timeline_for"))
if is_clip_tl:
self._traverse_and_map_preserve(tl)
seq_clip_guid = tl.metadata["clip_timeline_for"]
self._clip_timelines[seq_clip_guid] = guid
else:
self._traverse_and_map(tl)
self.active_timeline_guid = snapshot_data.get("active_timeline_guid")
if "playback_state" in snapshot_data:
self.playback_state = snapshot_data["playback_state"]
# Restore display_state: prefer the explicit snapshot field; fall back
# to timeline custom_metadata written by a previous session to disk.
if "display_state" in snapshot_data:
self.display_state = snapshot_data["display_state"]
else:
for tl in self._timelines.values():
ds = tl.metadata.get("display_settings")
if ds:
self.display_state = dict(ds)
break
replay_results: list[tuple[str, Any]] = []
for payload in self._delta_buffer:
p_data = payload.get("payload", {})
p_time: float = p_data.get("sync_timestamp", 0)
if p_time > timestamp:
res = self.apply_patch(payload)
if res:
replay_results.append(res)
self._delta_buffer = []
self._state_request_time = None
self._set_status(STATE_SYNCED)
return replay_results
finally:
self._is_syncing = False
[docs]
def close(self) -> None:
"""Stop the network backend and release all resources."""
if self.network:
self.network.stop()