Source code for otio_sync_core.patcher

"""Transport-agnostic patching engine for OpenTimelineIO (OTIO) graphs."""

from __future__ import annotations

import json
import logging
import time
import uuid
from typing import Any, Callable

import opentimelineio as otio

from .protocol_messages import (
    ProtocolMessage,
    InsertChild,
    MoveChild,
    RemoveChild,
    SetProperty,
    ReplaceAnnotationCommands,
)

_logger = logging.getLogger("otio_sync")


def _log(msg: str) -> None:
    if _logger.handlers:
        _logger.debug(msg)


def _otio_to_dict(obj: otio.core.SerializableObject) -> dict[str, Any]:
    return json.loads(otio.adapters.write_to_string(obj, "otio_json", indent=-1))


def _dict_to_otio(d: dict[str, Any]) -> otio.core.SerializableObject:
    return otio.adapters.read_from_string(json.dumps(d), "otio_json")


[docs] class OTIOPatcher: """Manages the lifecycle of OTIO graph patches. Tracks object GUIDs, observes mutations, and applies patch events (such as property changes or hierarchy insertions/moves/removals) to the local graph. """ def __init__(self) -> None: self.object_map: dict[str, otio.core.SerializableObject] = {} self._is_syncing: bool = False self._property_callbacks: list[Callable[[str, str, Any], None]] = [] self._hierarchy_callbacks: list[Callable[[str, str, str], None]] = []
[docs] def on_property_changed( self, callback: Callable[[str, str, Any], None] ) -> Callable[[str, str, Any], None]: """Register a callback for property change events. :param callback: Callable receiving ``(target_uuid, path, new_value)``. :returns: The *callback* unchanged (decorator-compatible). :rtype: Callable """ self._property_callbacks.append(callback) return callback
[docs] def on_hierarchy_changed( self, callback: Callable[[str, str, str], None] ) -> Callable[[str, str, str], None]: """Register a callback for hierarchy change events. :param callback: Callable receiving ``(parent_uuid, action, child_uuid)`` where *action* is one of ``"insert_child"``, ``"remove_child"``, or ``"move_child"``. :returns: The *callback* unchanged (decorator-compatible). :rtype: Callable """ self._hierarchy_callbacks.append(callback) return callback
def _fire_property_changed(self, target_uuid: str, path: str, value: Any) -> None: for cb in self._property_callbacks: try: cb(target_uuid, path, value) except Exception as e: _log(f"on_property_changed callback error: {e}") def _fire_hierarchy_changed( self, parent_uuid: str, action: str, child_uuid: str ) -> None: for cb in self._hierarchy_callbacks: try: cb(parent_uuid, action, child_uuid) except Exception as e: _log(f"on_hierarchy_changed callback error: {e}")
[docs] def traverse_and_map(self, item: otio.core.SerializableObject) -> None: """Recursively assign GUIDs to all OTIO objects under *item* and index them. :param item: Root OTIO object to traverse. """ def _walk(node: otio.core.SerializableObject): yield node if hasattr(node, "tracks"): stack = node.tracks yield stack for child in stack: yield from _walk(child) elif hasattr(node, "__iter__") and not isinstance(node, str): for child in node: yield from _walk(child) for obj in _walk(item): self.ensure_guid_and_map(obj)
[docs] def traverse_and_map_preserve(self, item: otio.core.SerializableObject) -> None: """Recursively assign GUIDs to all OTIO objects under *item* without overwriting existing entries. :param item: Root OTIO object to traverse. """ def _walk(node: otio.core.SerializableObject): yield node if hasattr(node, "tracks"): stack = node.tracks yield stack for child in stack: yield from _walk(child) elif hasattr(node, "__iter__") and not isinstance(node, str): for child in node: yield from _walk(child) for obj in _walk(item): if not isinstance(obj, otio.core.SerializableObject): continue if "sync" not in obj.metadata: obj.metadata["sync"] = {} if "guid" not in obj.metadata["sync"]: obj.metadata["sync"]["guid"] = str(uuid.uuid4()) guid = obj.metadata["sync"]["guid"] self.object_map.setdefault(guid, obj)
[docs] def ensure_guid_and_map(self, obj: Any) -> None: """Assign a sync GUID to *obj* if absent, then add it to ``object_map``. Non-:class:`~opentimelineio.core.SerializableObject` values are ignored. :param obj: Candidate OTIO object. """ if not isinstance(obj, otio.core.SerializableObject): return if "sync" not in obj.metadata: obj.metadata["sync"] = {} if "guid" not in obj.metadata["sync"]: obj.metadata["sync"]["guid"] = str(uuid.uuid4()) self.object_map[obj.metadata["sync"]["guid"]] = obj
@staticmethod def _find_annotation_clip_at( track: otio.schema.Track, clip_guid: str, frame: int, ) -> otio.schema.Clip | None: for child in track: if not isinstance(child, otio.schema.Clip): continue if child.metadata.get("clip_guid") != clip_guid: continue sr = getattr(child, "source_range", None) if sr is not None and int(sr.start_time.value) == frame: return child return None @staticmethod def _try_merge_annotation( parent: otio.schema.Track, child_obj: otio.core.SerializableObject, ) -> otio.schema.Clip | None: if not isinstance(parent, otio.schema.Track): return None if not hasattr(child_obj, "metadata"): return None incoming_cmds = child_obj.metadata.get("annotation_commands") incoming_cg = child_obj.metadata.get("clip_guid") incoming_sr = getattr(child_obj, "source_range", None) if not incoming_cmds or not incoming_cg or incoming_sr is None: return None incoming_frame = int(incoming_sr.start_time.value) existing = OTIOPatcher._find_annotation_clip_at( parent, incoming_cg, incoming_frame ) if existing is None: return None existing.metadata["annotation_commands"].extend(incoming_cmds) return existing
[docs] def set_property(self, target_uuid: str, path: str, value: Any) -> "ProtocolMessage | None": """Set property *path* to *value* on object *target_uuid* locally. :param target_uuid: GUID of the target object. :param path: Target property or metadata sub-key path (e.g. ``"name"`` or ``"metadata/custom"``). :param value: New value; must be a primitive type. :returns: The generated patch payload, or ``None`` if *target_uuid* is not found. :rtype: dict or None """ if target_uuid not in self.object_map: return None obj = self.object_map[target_uuid] if path.startswith("metadata/"): parts = path.split("/") curr = obj.metadata for part in parts[1:-1]: if part not in curr: curr[part] = {} curr = curr[part] curr[parts[-1]] = value else: setattr(obj, path, value) self._fire_property_changed(target_uuid, path, value) return SetProperty( target_uuid=target_uuid, path=path, value=value, sync_timestamp=time.time(), )
[docs] def insert_child( self, parent_uuid: str, child_obj: otio.core.SerializableObject, index: int = -1, ) -> "ProtocolMessage | None": """Insert *child_obj* into the parent container locally. :param parent_uuid: GUID of the parent container. :param child_obj: The OTIO object to insert. :param index: Position at which to insert; ``-1`` appends. :returns: The generated patch payload, or ``None`` if *parent_uuid* is not found. :rtype: dict or None """ if parent_uuid not in self.object_map: return None parent = self.object_map[parent_uuid] self.ensure_guid_and_map(child_obj) if index == -1: parent.append(child_obj) else: parent.insert(index, child_obj) child_uuid = child_obj.metadata["sync"]["guid"] self._fire_hierarchy_changed(parent_uuid, "insert_child", child_uuid) return InsertChild( parent_uuid=parent_uuid, index=index, child_data=child_obj, sync_timestamp=time.time(), )
[docs] def remove_child(self, parent_uuid: str, child_uuid: str) -> "ProtocolMessage | None": """Remove *child_uuid* from its parent container locally. :param parent_uuid: GUID of the parent container. :param child_uuid: GUID of the child to remove. :returns: The generated patch payload, or ``None`` if parent or child is not found. :rtype: dict or None """ if parent_uuid not in self.object_map: return None parent = self.object_map[parent_uuid] current_index = next( (i for i, item in enumerate(parent) if item.metadata.get("sync", {}).get("guid") == child_uuid), None, ) if current_index is None: return None del parent[current_index] self.object_map.pop(child_uuid, None) self._fire_hierarchy_changed(parent_uuid, "remove_child", child_uuid) return RemoveChild( parent_uuid=parent_uuid, child_uuid=child_uuid, sync_timestamp=time.time(), )
[docs] def move_child(self, parent_uuid: str, child_uuid: str, to_index: int) -> "ProtocolMessage | None": """Move *child_uuid* within its parent container locally. :param parent_uuid: GUID of the parent container. :param child_uuid: GUID of the child to move. :param to_index: Target position in the parent's child list. :returns: The generated patch payload, or ``None`` if parent/child is not found or index is unchanged. :rtype: dict or None """ if parent_uuid not in self.object_map: return None parent = self.object_map[parent_uuid] current_index = next( (i for i, item in enumerate(parent) if item.metadata.get("sync", {}).get("guid") == child_uuid), None, ) if current_index is None or current_index == to_index: return None child = parent[current_index] del parent[current_index] parent.insert(to_index, child) self._fire_hierarchy_changed(parent_uuid, "move_child", child_uuid) return MoveChild( parent_uuid=parent_uuid, child_uuid=child_uuid, to_index=to_index, sync_timestamp=time.time(), )
[docs] def apply_patch(self, msg: "ProtocolMessage") -> tuple[str, Any] | None: """Apply an OTIO-session mutation message to the local graph. Dispatches on the concrete message type, so the same class that built the payload (in :meth:`set_property`, :meth:`insert_child`, etc.) is the one used to consume it. :param msg: A reconstructed OTIO-session :class:`ProtocolMessage`: :class:`SetProperty`, :class:`MoveChild`, :class:`RemoveChild`, :class:`ReplaceAnnotationCommands`, or :class:`InsertChild`. :returns: An ``(action_name, action_data)`` tuple when the caller needs to act, or ``None``. :rtype: tuple or None """ self._is_syncing = True try: if isinstance(msg, SetProperty): target_uuid = msg.target_uuid if target_uuid in self.object_map: obj = self.object_map[target_uuid] path: str = msg.path value: Any = msg.value if path.startswith("metadata/"): parts = path.split("/") curr = obj.metadata for part in parts[1:-1]: if part not in curr: curr[part] = {} curr = curr[part] curr[parts[-1]] = value else: setattr(obj, path, value) self._fire_property_changed(target_uuid, path, value) return ("set_property", obj) elif isinstance(msg, MoveChild): parent_uuid: str = msg.parent_uuid child_uuid: str = msg.child_uuid to_index: int = msg.to_index parent = self.object_map.get(parent_uuid) child = self.object_map.get(child_uuid) if parent is not None and child is not None: current_index = next( (i for i, item in enumerate(parent) if item.metadata.get("sync", {}).get("guid") == child_uuid), None, ) if current_index is not None: del parent[current_index] parent.insert(to_index, child) self._fire_hierarchy_changed(parent_uuid, "move_child", child_uuid) return ("move_child", msg.to_payload()) elif isinstance(msg, RemoveChild): parent_uuid = msg.parent_uuid child_uuid = msg.child_uuid parent = self.object_map.get(parent_uuid) if parent is not None: current_index = next( (i for i, item in enumerate(parent) if item.metadata.get("sync", {}).get("guid") == child_uuid), None, ) if current_index is not None: del parent[current_index] self.object_map.pop(child_uuid, None) self._fire_hierarchy_changed(parent_uuid, "remove_child", child_uuid) return ("remove_child", msg.to_payload()) elif isinstance(msg, ReplaceAnnotationCommands): ann_clip_guid = msg.annotation_clip_guid clip = self.object_map.get(ann_clip_guid) if clip is None: _log(f"REPLACE_ANNOTATION_COMMANDS: clip {ann_clip_guid} not found") return None clip.metadata["annotation_commands"] = msg.as_otio() return ("annotation_commands_replaced", clip) elif isinstance(msg, InsertChild): parent_uuid = msg.parent_uuid if parent_uuid in self.object_map: parent = self.object_map[parent_uuid] index: int = msg.index child_obj = msg.as_otio() merged = self._try_merge_annotation(parent, child_obj) if merged is not None: self.ensure_guid_and_map(child_obj) return ("annotation_commands_added", (merged, child_obj)) if index == -1: parent.append(child_obj) else: parent.insert(index, child_obj) self.ensure_guid_and_map(child_obj) child_uuid = child_obj.metadata["sync"]["guid"] self._fire_hierarchy_changed(parent_uuid, "insert_child", child_uuid) return ("insert_child", child_obj) finally: self._is_syncing = False return None