Source code for otio_sync_core.protocol_messages

"""Typed protocol message definitions for the OTIO Sync transport layer.

Each message class defined here is the **single source of truth** for one
transport-layer message: its ``command_schema``, its ``event`` name, and the
shape of its payload.  This mirrors how :mod:`SyncEvent` is the source of truth
for the OTIO message layer, and lets a documentation generator describe the
protocol directly from these classes (see ``docs/`` generator).

Design constraints (see the ``typed-protocol-messages`` change design doc):

* Messages are **pure data** — handler logic lives in the manager/patcher, not
  on the message classes — so the classes stay importable in isolation for
  documentation.
* Registration is explicit via the :func:`register` decorator, keyed on
  ``(SCHEMA, EVENT)``, so the receive-side dispatch registry cannot drift from
  the definitions.
* Serialization is explicit: ``to_payload()`` builds a plain ``dict`` without
  reflective whole-object walking (no :func:`dataclasses.asdict`) and without
  per-message ``isinstance`` validation, so hot-path messages
  (:class:`PartialAnnotation`, :class:`PlaybackSettingsSet`) stay cheap.
* The settings messages declare their known fields for documentation but
  **tolerate** unknown fields (carried in ``extras``) for forward-compatibility
  with independent producers.
"""

from __future__ import annotations

import json
from dataclasses import MISSING, dataclass, field, fields
from typing import Any, ClassVar

# ---------------------------------------------------------------------------
# OTIO wire conversion
#
# These helpers are the single place that converts between OTIO objects and
# their wire form.  ``opentimelineio`` is imported lazily *inside* them so this
# module stays importable without OTIO installed (the documentation generator
# only reads class/field metadata, never calls these).  The format must stay
# byte-identical to the prior call-site serialization (``otio_json``,
# ``indent=-1``) so peers on older code keep interoperating.
# ---------------------------------------------------------------------------


def _to_wire(obj: Any) -> Any:
    """Serialize an OTIO object to its wire ``dict``; pass through a ``dict``.

    :param obj: An OTIO ``SerializableObject`` or an already-serialized dict.
    :returns: The wire-form dict.
    """
    if isinstance(obj, dict):
        return obj
    import opentimelineio as otio

    return json.loads(otio.adapters.write_to_string(obj, "otio_json", indent=-1))


def _from_wire(data: Any) -> Any:
    """Deserialize a wire ``dict`` to an OTIO object; pass through a non-dict.

    :param data: A wire-form dict, or an already-deserialized OTIO object.
    :returns: The OTIO ``SerializableObject``.
    """
    if not isinstance(data, dict):
        return data
    import opentimelineio as otio

    return otio.adapters.read_from_string(json.dumps(data), "otio_json")


# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------

#: Maps ``(command_schema, event)`` to the message class that defines it.
_REGISTRY: dict[tuple[str, str], type["ProtocolMessage"]] = {}


[docs] def register(cls: type["ProtocolMessage"]) -> type["ProtocolMessage"]: """Register *cls* in the protocol registry keyed on ``(SCHEMA, EVENT)``. Used as a class decorator. Raises if two classes claim the same ``(SCHEMA, EVENT)`` pair, so collisions surface at import time. :param cls: A :class:`ProtocolMessage` subclass with ``SCHEMA``/``EVENT`` set. :returns: *cls* unchanged (decorator-compatible). """ key = (cls.SCHEMA, cls.EVENT) if not cls.SCHEMA or not cls.EVENT: raise ValueError(f"{cls.__name__} must define non-empty SCHEMA and EVENT") if key in _REGISTRY: raise ValueError( f"Duplicate protocol message registration for {key}: " f"{_REGISTRY[key].__name__} and {cls.__name__}" ) _REGISTRY[key] = cls return cls
[docs] def message_for(command_schema: str, event: str) -> "type[ProtocolMessage] | None": """Return the message class for ``(command_schema, event)``, or ``None``. :param command_schema: Envelope ``command_schema`` value. :param event: Envelope ``command.event`` value. :returns: The registered :class:`ProtocolMessage` subclass, or ``None`` when the pair is unknown (caller should ignore the message safely). """ return _REGISTRY.get((command_schema, event))
[docs] def registered_messages() -> dict[tuple[str, str], type["ProtocolMessage"]]: """Return a copy of the full ``(schema, event) -> class`` registry. Used by the documentation generator to enumerate every protocol message. """ return dict(_REGISTRY)
[docs] def doc_field( *, default: Any = MISSING, default_factory: Any = MISSING, doc: str = "", ): """Declare a dataclass field carrying a documentation string in metadata. The documentation generator reads ``field.metadata["doc"]`` for each field. :param default: Default value (mutually exclusive with *default_factory*). :param default_factory: Zero-arg callable producing the default. :param doc: Human-readable description of the field. """ if default_factory is not MISSING: return field(default_factory=default_factory, metadata={"doc": doc}) if default is not MISSING: return field(default=default, metadata={"doc": doc}) return field(metadata={"doc": doc})
# --------------------------------------------------------------------------- # Base class # ---------------------------------------------------------------------------
[docs] class ProtocolMessage: """Base class for all transport-layer protocol messages. Subclasses are ``@dataclass``-decorated and ``@register``-ed. They set the class-level :attr:`SCHEMA` and :attr:`EVENT` constants and implement :meth:`to_payload` / :meth:`from_payload` explicitly. :cvar SCHEMA: The envelope ``command_schema`` for this message. :cvar EVENT: The envelope ``command.event`` for this message. :cvar ENVELOPE_SCHEMA: Optional top-level ``schema`` key written on the envelope (only :class:`IAmMaster` uses this for legacy compatibility). """ SCHEMA: ClassVar[str] = "" EVENT: ClassVar[str] = "" ENVELOPE_SCHEMA: ClassVar["str | None"] = None
[docs] def to_payload(self) -> dict[str, Any]: """Return the ``command.payload`` dict for this message.""" raise NotImplementedError
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "ProtocolMessage": """Reconstruct a message instance from a received ``command.payload``.""" raise NotImplementedError
[docs] @classmethod def doc_fields(cls) -> list[tuple[str, str, str]]: """Return ``(name, type, description)`` triples for documentation. Default implementation reads the dataclass fields, skipping the ``extras`` catch-all used by tolerant messages. :returns: List of ``(field_name, type_name, doc)`` tuples. """ out: list[tuple[str, str, str]] = [] for f in fields(cls): # type: ignore[arg-type] if f.name == "extras": continue type_name = getattr(f.type, "__name__", str(f.type)) out.append((f.name, type_name, f.metadata.get("doc", ""))) return out
# --------------------------------------------------------------------------- # Session family — LiveSession.1 # ---------------------------------------------------------------------------
[docs] @register @dataclass class WhoIsMaster(ProtocolMessage): """Master-discovery broadcast asking any existing master to identify itself.""" SCHEMA = "LiveSession.1" EVENT = "WHO_IS_MASTER" requester_guid: str = doc_field(doc="GUID of the peer asking who the master is.")
[docs] def to_payload(self) -> dict[str, Any]: return {"requester_guid": self.requester_guid}
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "WhoIsMaster": return cls(requester_guid=data.get("requester_guid"))
[docs] @register @dataclass class IAmMaster(ProtocolMessage): """Master's response to discovery, announcing itself as session master.""" SCHEMA = "LiveSession.1" EVENT = "I_AM_MASTER" #: Legacy top-level envelope schema preserved for older peers. ENVELOPE_SCHEMA = "SYNC_REVIEW_1.0" master_guid: str = doc_field(doc="GUID of the peer that is the session master.")
[docs] def to_payload(self) -> dict[str, Any]: return {"master_guid": self.master_guid}
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "IAmMaster": return cls(master_guid=data.get("master_guid"))
[docs] @register @dataclass class StateRequest(ProtocolMessage): """Joiner's request to the master for a full state snapshot.""" SCHEMA = "LiveSession.1" EVENT = "STATE_REQUEST" target_guid: str = doc_field(doc="GUID of the master the request is aimed at.") requester_guid: str = doc_field(doc="GUID of the joining peer.")
[docs] def to_payload(self) -> dict[str, Any]: return {"target_guid": self.target_guid, "requester_guid": self.requester_guid}
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "StateRequest": return cls( target_guid=data.get("target_guid"), requester_guid=data.get("requester_guid"), )
[docs] @register @dataclass class StateSnapshot(ProtocolMessage): """Master's full session snapshot sent in response to a state request.""" SCHEMA = "LiveSession.1" EVENT = "STATE_SNAPSHOT" target_guid: str = doc_field(doc="GUID of the joining peer this snapshot is for.") timelines: dict = doc_field( default_factory=dict, doc="Map of timeline GUID to OTIO timeline (objects on send, wire dicts on receive).", ) active_timeline_guid: "str | None" = doc_field( default=None, doc="GUID of the active timeline at snapshot time." ) snapshot_timestamp: "float | None" = doc_field( default=None, doc="Epoch seconds when the snapshot was taken." ) playback_state: "dict | None" = doc_field( default=None, doc="Optional current playback state to seed the joiner." ) display_state: "dict | None" = doc_field( default=None, doc="Optional current display state to seed the joiner." )
[docs] def to_payload(self) -> dict[str, Any]: payload: dict[str, Any] = { "target_guid": self.target_guid, "timelines": {g: _to_wire(tl) for g, tl in self.timelines.items()}, "active_timeline_guid": self.active_timeline_guid, "snapshot_timestamp": self.snapshot_timestamp, } if self.playback_state is not None: payload["playback_state"] = self.playback_state if self.display_state is not None: payload["display_state"] = self.display_state return payload
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "StateSnapshot": return cls( target_guid=data.get("target_guid"), timelines=data.get("timelines", {}), active_timeline_guid=data.get("active_timeline_guid"), snapshot_timestamp=data.get("snapshot_timestamp"), playback_state=data.get("playback_state"), display_state=data.get("display_state"), )
[docs] def as_otio(self) -> dict[str, Any]: """Return ``{guid: OTIO timeline}``, deserializing any wire-form entries.""" return {g: _from_wire(tl) for g, tl in self.timelines.items()}
[docs] @register @dataclass class NewPresenter(ProtocolMessage): """Announces that a peer has become the session presenter.""" SCHEMA = "LiveSession.1" EVENT = "NEW_PRESENTER" presenter_hash: str = doc_field(doc="Hash identifying the new presenter.")
[docs] def to_payload(self) -> dict[str, Any]: return {"presenter_hash": self.presenter_hash}
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "NewPresenter": return cls(presenter_hash=data.get("presenter_hash"))
[docs] @register @dataclass class NewParticipant(ProtocolMessage): """Announces that a new participant has joined the sync review.""" SCHEMA = "LiveSession.1" EVENT = "NEW_PARTICIPANT"
[docs] def to_payload(self) -> dict[str, Any]: return {}
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "NewParticipant": return cls()
[docs] @register @dataclass class SharedKeyRequest(ProtocolMessage): """Requests the session's shared key from a peer.""" SCHEMA = "LiveSession.1" EVENT = "SHARED_KEY_REQUEST" key: str = doc_field(doc="The shared key being requested.")
[docs] def to_payload(self) -> dict[str, Any]: return {"key": self.key}
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "SharedKeyRequest": return cls(key=data.get("key"))
[docs] @register @dataclass class SharedKeyResponse(ProtocolMessage): """Responds to a shared-key request with the session's shared key.""" SCHEMA = "LiveSession.1" EVENT = "SHARED_KEY_RESPONSE" key: str = doc_field(doc="The shared key being returned.")
[docs] def to_payload(self) -> dict[str, Any]: return {"key": self.key}
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "SharedKeyResponse": return cls(key=data.get("key"))
# --------------------------------------------------------------------------- # Timeline family — TIMELINE_1.0 # ---------------------------------------------------------------------------
[docs] @register @dataclass class AddTimeline(ProtocolMessage): """Registers a new timeline (sequence or single-clip) with all peers.""" SCHEMA = "TIMELINE_1.0" EVENT = "ADD_TIMELINE" timeline_guid: str = doc_field(doc="GUID of the timeline being added.") timeline: Any = doc_field( doc="OTIO timeline (object on send, wire dict on receive)." ) sync_timestamp: "float | None" = doc_field( default=None, doc="Epoch seconds when the message was sent." )
[docs] def to_payload(self) -> dict[str, Any]: return { "timeline_guid": self.timeline_guid, "timeline": _to_wire(self.timeline), "sync_timestamp": self.sync_timestamp, }
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "AddTimeline": return cls( timeline_guid=data.get("timeline_guid"), timeline=data.get("timeline"), sync_timestamp=data.get("sync_timestamp"), )
[docs] def as_otio(self) -> Any: """Return the OTIO timeline, deserializing if still in wire form.""" return _from_wire(self.timeline)
[docs] @register @dataclass class RenameTimeline(ProtocolMessage): """Renames an existing timeline on all peers.""" SCHEMA = "TIMELINE_1.0" EVENT = "RENAME_TIMELINE" timeline_guid: str = doc_field(doc="GUID of the timeline to rename.") name: str = doc_field(doc="New display name for the timeline.") sync_timestamp: "float | None" = doc_field( default=None, doc="Epoch seconds when the message was sent." )
[docs] def to_payload(self) -> dict[str, Any]: return { "timeline_guid": self.timeline_guid, "name": self.name, "sync_timestamp": self.sync_timestamp, }
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "RenameTimeline": return cls( timeline_guid=data.get("timeline_guid"), name=data.get("name", ""), sync_timestamp=data.get("sync_timestamp"), )
# --------------------------------------------------------------------------- # Settings family — declare known fields, tolerate extras (hot paths) # ---------------------------------------------------------------------------
[docs] @register @dataclass class PlaybackSettingsSet(ProtocolMessage): """Playback state broadcast. Hot path: fires on frame change during playback/scrubbing. Known fields are declared for documentation; any additional producer fields are preserved in ``extras`` and round-tripped unchanged. """ SCHEMA = "PLAYBACK_SETTINGS_1.0" EVENT = "SET" playing: "bool | None" = doc_field(default=None, doc="Whether playback is running.") current_time: "dict | None" = doc_field( default=None, doc="Current position as a serialized RationalTime." ) looping: "bool | None" = doc_field(default=None, doc="Whether playback loops.") timeline_guid: "str | None" = doc_field( default=None, doc="GUID of the timeline being played." ) sync_timestamp: "float | None" = doc_field( default=None, doc="Epoch seconds when the message was sent." ) extras: dict = field(default_factory=dict) #: Field names modelled explicitly (everything else falls into ``extras``). _KNOWN: ClassVar[tuple[str, ...]] = ( "playing", "current_time", "looping", "timeline_guid", "sync_timestamp", )
[docs] def to_payload(self) -> dict[str, Any]: payload: dict[str, Any] = {} if self.playing is not None: payload["playing"] = self.playing if self.current_time is not None: payload["current_time"] = self.current_time if self.looping is not None: payload["looping"] = self.looping if self.timeline_guid is not None: payload["timeline_guid"] = self.timeline_guid if self.sync_timestamp is not None: payload["sync_timestamp"] = self.sync_timestamp payload.update(self.extras) return payload
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "PlaybackSettingsSet": extras = {k: v for k, v in data.items() if k not in cls._KNOWN} return cls( playing=data.get("playing"), current_time=data.get("current_time"), looping=data.get("looping"), timeline_guid=data.get("timeline_guid"), sync_timestamp=data.get("sync_timestamp"), extras=extras, )
[docs] @register @dataclass class DisplaySettingsSet(ProtocolMessage): """Display state broadcast (pan/zoom/exposure/channel). Known fields are declared for documentation; additional producer fields are preserved in ``extras``. """ SCHEMA = "DISPLAY_SETTINGS_1.0" EVENT = "SET" pan: "list | None" = doc_field(default=None, doc="Normalised [x, y] pan offset.") zoom: "float | None" = doc_field(default=None, doc="Zoom multiplier (1.0 = none).") exposure: "float | None" = doc_field( default=None, doc="Exposure adjustment in stops (0.0 = none)." ) channel: "str | None" = doc_field( default=None, doc='Active channel: "RGBA", "R", "G", "B", or "A".' ) sync_timestamp: "float | None" = doc_field( default=None, doc="Epoch seconds when the message was sent." ) extras: dict = field(default_factory=dict) _KNOWN: ClassVar[tuple[str, ...]] = ( "pan", "zoom", "exposure", "channel", "sync_timestamp", )
[docs] def to_payload(self) -> dict[str, Any]: payload: dict[str, Any] = {} if self.pan is not None: payload["pan"] = self.pan if self.zoom is not None: payload["zoom"] = self.zoom if self.exposure is not None: payload["exposure"] = self.exposure if self.channel is not None: payload["channel"] = self.channel if self.sync_timestamp is not None: payload["sync_timestamp"] = self.sync_timestamp payload.update(self.extras) return payload
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "DisplaySettingsSet": extras = {k: v for k, v in data.items() if k not in cls._KNOWN} return cls( pan=data.get("pan"), zoom=data.get("zoom"), exposure=data.get("exposure"), channel=data.get("channel"), sync_timestamp=data.get("sync_timestamp"), extras=extras, )
# --------------------------------------------------------------------------- # Selection family — SELECTION_1.0 # ---------------------------------------------------------------------------
[docs] @register @dataclass class SelectionSet(ProtocolMessage): """Broadcasts the clip the master has selected and the active view mode.""" SCHEMA = "SELECTION_1.0" EVENT = "SET" clip_guid: str = doc_field(doc="Sync GUID of the selected clip ('' to clear).") view_mode: str = doc_field(default="source", doc='View mode: "source" or "sequence".') sync_timestamp: "float | None" = doc_field( default=None, doc="Epoch seconds when the message was sent." )
[docs] def to_payload(self) -> dict[str, Any]: return { "clip_guid": self.clip_guid, "view_mode": self.view_mode, "sync_timestamp": self.sync_timestamp, }
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "SelectionSet": return cls( clip_guid=data.get("clip_guid"), view_mode=data.get("view_mode", "source"), sync_timestamp=data.get("sync_timestamp"), )
# --------------------------------------------------------------------------- # Annotation family — Annotation.1 (hot path) # ---------------------------------------------------------------------------
[docs] @register @dataclass class PartialAnnotation(ProtocolMessage): """Mid-stroke partial annotation (visual preview, not persisted). Hot path: fires repeatedly while a stroke is being drawn. No validation or reflective serialization is performed. """ SCHEMA = "Annotation.1" EVENT = "PARTIAL" clip_guid: str = doc_field(doc="Sync GUID of the clip being annotated.") frame: float = doc_field(doc="0-indexed clip-local frame number.") fps: float = doc_field(doc="Frame rate used to interpret 'frame'.") # Deliberately kept as serialized SyncEvent dicts (NOT typed OTIO objects): # this is the hottest path and the host codec already produces dicts, so a # typed field would force wasteful deserialize-then-reserialize churn. events: list = doc_field( default_factory=list, doc="Serialized SyncEvent dicts for the in-progress stroke." )
[docs] def to_payload(self) -> dict[str, Any]: return { "clip_guid": self.clip_guid, "frame": self.frame, "fps": self.fps, "events": self.events, }
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "PartialAnnotation": return cls( clip_guid=data.get("clip_guid"), frame=data.get("frame"), fps=data.get("fps"), events=data.get("events", []), )
# --------------------------------------------------------------------------- # OTIO session family — OTIO_SESSION_1.0 # Single definition: built and consumed by patcher.py. Payloads carry the # wire form (already-serialized child_data / commands), so to_payload is a cheap # field copy. # ---------------------------------------------------------------------------
[docs] @register @dataclass class SetProperty(ProtocolMessage): """Sets a property or metadata path on an object.""" SCHEMA = "OTIO_SESSION_1.0" EVENT = "SET_PROPERTY" target_uuid: str = doc_field(doc="GUID of the target object.") path: str = doc_field(doc="Property name or 'metadata/...' sub-path.") value: Any = doc_field(doc="New primitive value.") sync_timestamp: "float | None" = doc_field( default=None, doc="Epoch seconds when the mutation occurred." )
[docs] def to_payload(self) -> dict[str, Any]: return { "target_uuid": self.target_uuid, "path": self.path, "value": self.value, "sync_timestamp": self.sync_timestamp, }
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "SetProperty": return cls( target_uuid=data.get("target_uuid"), path=data.get("path"), value=data.get("value"), sync_timestamp=data.get("sync_timestamp"), )
[docs] @register @dataclass class InsertChild(ProtocolMessage): """Inserts a child object into a parent container.""" SCHEMA = "OTIO_SESSION_1.0" EVENT = "INSERT_CHILD" parent_uuid: str = doc_field(doc="GUID of the parent container.") child_data: Any = doc_field( doc="OTIO child object (object on send, wire dict on receive)." ) index: int = doc_field(default=-1, doc="Insert position; -1 appends.") sync_timestamp: "float | None" = doc_field( default=None, doc="Epoch seconds when the mutation occurred." )
[docs] def to_payload(self) -> dict[str, Any]: return { "parent_uuid": self.parent_uuid, "index": self.index, "child_data": _to_wire(self.child_data), "sync_timestamp": self.sync_timestamp, }
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "InsertChild": return cls( parent_uuid=data.get("parent_uuid"), child_data=data.get("child_data"), index=data.get("index", -1), sync_timestamp=data.get("sync_timestamp"), )
[docs] def as_otio(self) -> Any: """Return the OTIO child object, deserializing if still in wire form.""" return _from_wire(self.child_data)
[docs] @register @dataclass class MoveChild(ProtocolMessage): """Moves a child to a new index within its parent container.""" SCHEMA = "OTIO_SESSION_1.0" EVENT = "MOVE_CHILD" parent_uuid: str = doc_field(doc="GUID of the parent container.") child_uuid: str = doc_field(doc="GUID of the child to move.") to_index: int = doc_field(default=0, doc="Target position in the parent.") sync_timestamp: "float | None" = doc_field( default=None, doc="Epoch seconds when the mutation occurred." )
[docs] def to_payload(self) -> dict[str, Any]: return { "parent_uuid": self.parent_uuid, "child_uuid": self.child_uuid, "to_index": self.to_index, "sync_timestamp": self.sync_timestamp, }
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "MoveChild": return cls( parent_uuid=data.get("parent_uuid"), child_uuid=data.get("child_uuid"), to_index=data.get("to_index", 0), sync_timestamp=data.get("sync_timestamp"), )
[docs] @register @dataclass class RemoveChild(ProtocolMessage): """Removes a child from its parent container.""" SCHEMA = "OTIO_SESSION_1.0" EVENT = "REMOVE_CHILD" parent_uuid: str = doc_field(doc="GUID of the parent container.") child_uuid: str = doc_field(doc="GUID of the child to remove.") sync_timestamp: "float | None" = doc_field( default=None, doc="Epoch seconds when the mutation occurred." )
[docs] def to_payload(self) -> dict[str, Any]: return { "parent_uuid": self.parent_uuid, "child_uuid": self.child_uuid, "sync_timestamp": self.sync_timestamp, }
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "RemoveChild": return cls( parent_uuid=data.get("parent_uuid"), child_uuid=data.get("child_uuid"), sync_timestamp=data.get("sync_timestamp"), )
[docs] @register @dataclass class ReplaceAnnotationCommands(ProtocolMessage): """Replaces the full annotation-command list on an annotation clip.""" SCHEMA = "OTIO_SESSION_1.0" EVENT = "REPLACE_ANNOTATION_COMMANDS" annotation_clip_guid: str = doc_field(doc="GUID of the annotation clip to update.") commands: list = doc_field( default_factory=list, doc="Full replacement list of OTIO SyncEvents (objects on send, wire dicts on receive).", ) sync_timestamp: "float | None" = doc_field( default=None, doc="Epoch seconds when the mutation occurred." )
[docs] def to_payload(self) -> dict[str, Any]: return { "annotation_clip_guid": self.annotation_clip_guid, "commands": [_to_wire(c) for c in self.commands], "sync_timestamp": self.sync_timestamp, }
[docs] @classmethod def from_payload(cls, data: dict[str, Any]) -> "ReplaceAnnotationCommands": return cls( annotation_clip_guid=data.get("annotation_clip_guid"), commands=data.get("commands", []), sync_timestamp=data.get("sync_timestamp"), )
[docs] def as_otio(self) -> list: """Return the OTIO SyncEvent list, deserializing any wire-form entries.""" return [_from_wire(c) for c in self.commands]