Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions statemachine/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class CallbackGroup(IntEnum):
PREPARE = auto()
ENTER = auto()
EXIT = auto()
INVOKE = auto()
VALIDATOR = auto()
BEFORE = auto()
ON = auto()
Expand Down
29 changes: 29 additions & 0 deletions statemachine/engines/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ async def _exit_states( # type: ignore[override]
for info in ordered_states:
args, kwargs = await self._get_args_kwargs(info.transition, trigger_data)

if self._invoke is not None:
self._invoke.on_state_exiting(info.state)

if info.state is not None: # pragma: no branch
await self.sm._callbacks.async_call(
info.state.exit.key, *args, on_error=on_error, **kwargs
Expand Down Expand Up @@ -242,6 +245,10 @@ async def _enter_states( # noqa: C901
new_configuration=new_configuration,
)

# Track states with invocations for post-macrostep spawning
if self._invoke is not None:
self._invoke.on_state_entered(target)

# Handle final states
if target.final:
self._handle_final_state(target, on_entry_result)
Expand Down Expand Up @@ -301,6 +308,19 @@ async def _run_microstep(self, enabled_transitions, trigger_data): # pragma: no
except Exception as e:
self._handle_error(e, trigger_data)

async def enter_initial_configuration(self):
"""Enter the initial state configuration without starting the processing loop.

Async override of the base method for use by invoke's two-phase activation.
"""
if self.sm.current_state_value is not None:
return
from ..event import BoundEvent

trigger_data = BoundEvent("__initial__", _sm=self.sm).build_trigger(machine=self.sm)
transitions = self._initial_transitions(trigger_data)
await self._enter_states(transitions, trigger_data, OrderedSet(), OrderedSet())

async def activate_initial_state(self):
"""Activate the initial state.

Expand Down Expand Up @@ -358,6 +378,10 @@ async def processing_loop( # noqa: C901
took_events = True
await self._run_microstep(enabled_transitions, internal_event)

# Spawn invocations for states entered during this macrostep
if self._invoke is not None:
await self._invoke.spawn_pending_async(internal_event)

# Phase 2: remaining internal events
while not self.internal_queue.is_empty(): # pragma: no cover
internal_event = self.internal_queue.pop()
Expand All @@ -379,6 +403,11 @@ async def processing_loop( # noqa: C901
# transitions can be processed while we wait.
break

# Handle invoke-related event processing (forward, finalize, autoforward)
if self._invoke is not None:
if self._invoke.handle_external_event(external_event):
continue

logger.debug("External event: %s", external_event.event)

# Handle lazy initial state activation.
Expand Down
57 changes: 56 additions & 1 deletion statemachine/engines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from ..transition import Transition

if TYPE_CHECKING:
from ..invoke import InvokeManager
from ..statemachine import StateChart

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -94,6 +95,7 @@ def __init__(self, sm: "StateChart"):
self.running = True
self._processing = Lock()
self._cache: Dict = {} # Cache for _get_args_kwargs results
self._invoke: "InvokeManager | None" = None # set by StateChart if needed

def empty(self): # pragma: no cover
return self.external_queue.is_empty()
Expand Down Expand Up @@ -185,6 +187,19 @@ def start(self):

BoundEvent("__initial__", _sm=self.sm).put()

def enter_initial_configuration(self):
"""Enter the initial state configuration without starting the processing loop.

Used by invoke to split activation into two phases: enter initial states
(which runs datamodel entry actions), then apply invoke params, then start
the processing loop.
"""
if self.sm.current_state_value is not None:
return
trigger_data = BoundEvent("__initial__", _sm=self.sm).build_trigger(machine=self.sm)
transitions = self._initial_transitions(trigger_data)
self._enter_states(transitions, trigger_data, OrderedSet(), OrderedSet())

def _initial_transitions(self, trigger_data):
empty_state = State()
configuration = self.sm._get_initial_configuration()
Expand Down Expand Up @@ -485,6 +500,9 @@ def _exit_states(
for info in ordered_states:
args, kwargs = self._get_args_kwargs(info.transition, trigger_data)

if self._invoke is not None:
self._invoke.on_state_exiting(info.state)

# Execute `onexit` handlers — same per-block error isolation as onentry.
if info.state is not None: # pragma: no branch
self.sm._callbacks.call(info.state.exit.key, *args, on_error=on_error, **kwargs)
Expand Down Expand Up @@ -552,10 +570,43 @@ def _add_state_to_configuration(self, target: State):
if not self.sm.atomic_configuration_update:
self.sm.configuration |= {target}

def _terminate_child_session(self):
"""Exit all active states (firing onexit handlers) and stop the engine.

Called when a child session (one with ``_invoke_session``) reaches a
top-level final state. Per SCXML spec, all active states have their
``onexit`` handlers fired (in reverse document order), but the final
configuration is preserved so that observers can see which final state
was reached.
"""
on_error = self._on_error_handler()
active_states = sorted(
self.sm.configuration,
key=lambda s: s.document_order,
reverse=True,
)
trigger_data = TriggerData(self.sm, event=None)
event_data = EventData(trigger_data=trigger_data, transition=None)
args, kwargs = event_data.args, event_data.extended_kwargs
for state in active_states:
if self._invoke is not None:
self._invoke.on_state_exiting(state)
self.sm._callbacks.call(state.exit.key, *args, on_error=on_error, **kwargs)
if self._invoke is not None:
self._invoke.on_terminate()
self.running = False

def _handle_final_state(self, target: State, on_entry_result: list):
"""Handle final state entry: queue done events. No direct callback dispatch."""
if target.parent is None:
self.running = False
if getattr(self.sm, "_invoke_session", None) is not None:
# Child session: fire onexit on all active states before terminating
# so that #_parent sends in <onexit> of final states are delivered
self._terminate_child_session()
else:
self.running = False
if self._invoke is not None:
self._invoke.on_terminate()
else:
parent = target.parent
grandparent = parent.parent
Expand Down Expand Up @@ -645,6 +696,10 @@ def _enter_states( # noqa: C901
new_configuration=new_configuration,
)

# Track states with invocations for post-macrostep spawning
if self._invoke is not None:
self._invoke.on_state_entered(target)

# Handle final states
if target.final:
self._handle_final_state(target, on_entry_result)
Expand Down
38 changes: 20 additions & 18 deletions statemachine/engines/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def processing_loop(self, caller_future=None): # noqa: C901
first_result = self._sentinel
try:
took_events = True
while took_events:
while took_events and not self.sm.is_terminated:
self.clear_cache()
took_events = False
# Execute the triggers in the queue in FIFO order until the queue is empty
Expand Down Expand Up @@ -107,11 +107,9 @@ def processing_loop(self, caller_future=None): # noqa: C901
took_events = True
self._run_microstep(enabled_transitions, internal_event)

# TODO: Invoke platform-specific logic
# for state in sorted(self.states_to_invoke, key=self.entry_order):
# for inv in sorted(state.invoke, key=self.document_order):
# self.invoke(inv)
# self.states_to_invoke.clear()
# Spawn invocations for states entered during this macrostep
if self._invoke is not None:
self._invoke.spawn_pending_sync(internal_event)

# Process remaining internal events before external events.
# Note: the macrostep loop above already drains the internal queue,
Expand All @@ -136,19 +134,12 @@ def processing_loop(self, caller_future=None): # noqa: C901
# transitions can be processed while we wait.
break

# Handle invoke-related event processing (forward, finalize, autoforward)
if self._invoke is not None:
if self._invoke.handle_external_event(external_event):
continue

logger.debug("External event: %s", external_event.event)
# # TODO: Handle cancel event
# if self.is_cancel_event(external_event):
# self.running = False
# return

# TODO: Invoke states
# for state in self.configuration:
# for inv in state.invoke:
# if inv.invokeid == external_event.invokeid:
# self.apply_finalize(inv, external_event)
# if inv.autoforward:
# self.send(inv.id, external_event)

enabled_transitions = self.select_transitions(external_event)
logger.debug("Enabled transitions: %s", enabled_transitions)
Expand All @@ -164,10 +155,21 @@ def processing_loop(self, caller_future=None): # noqa: C901
self.clear()
raise

# Per SCXML spec: process ONE external event per macrostep,
# then loop back to handle eventless transitions, internal
# events, and invoke spawning before the next external event.
break

else:
if not self.sm.allow_event_without_transition:
raise TransitionNotAllowed(external_event.event, self.sm.configuration)

# If no events were processed but there are pending events
# on the external queue (e.g., delayed timeouts in SCXML tests),
# keep the loop alive so child sessions can send events back.
if not took_events and not self.external_queue.is_empty():
took_events = True

finally:
self._processing.release()
return first_result if first_result is not self._sentinel else None
Expand Down
43 changes: 39 additions & 4 deletions statemachine/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,22 @@ def __repr__(self):
)

def is_same_event(self, *_args, event: "str | None" = None, **_kwargs) -> bool:
return self == event
if self == event:
return True
if event is not None:
return self._is_prefix_match(str(event))
return False

def _is_prefix_match(self, event_str: str) -> bool:
"""SCXML prefix matching with dot/underscore normalization.

``'done.invoke.x'`` matches ``'done.invoke.x.uuid'``.
"""
self_dot = str(self).replace("_", ".")
event_dot = event_str.replace("_", ".")
if self_dot == event_dot:
return True
return event_dot.startswith(self_dot + ".")

def _add_callback(self, callback, grouper: CallbackGroup, is_event=False, **kwargs):
if self._transitions is None:
Expand All @@ -121,17 +136,36 @@ def __get__(self, instance, owner):
return self
return BoundEvent(id=self.id, name=self.name, delay=self.delay, _sm=instance)

def put(self, *args, send_id: "str | None" = None, **kwargs):
def put(
self,
*args,
send_id: "str | None" = None,
invokeid: "str | None" = None,
**kwargs,
):
# The `__call__` is declared here to help IDEs knowing that an `Event`
# can be called as a method. But it is not meant to be called without
# an SM instance. Such SM instance is provided by `__get__` method when
# used as a property descriptor.
assert self._sm is not None
trigger_data = self.build_trigger(*args, machine=self._sm, send_id=send_id, **kwargs)
trigger_data = self.build_trigger(
*args,
machine=self._sm,
send_id=send_id,
invokeid=invokeid,
**kwargs,
)
self._sm._put_nonblocking(trigger_data, internal=self.internal)
return trigger_data

def build_trigger(self, *args, machine: "StateChart", send_id: "str | None" = None, **kwargs):
def build_trigger(
self,
*args,
machine: "StateChart",
send_id: "str | None" = None,
invokeid: "str | None" = None,
**kwargs,
):
if machine is None:
raise RuntimeError(_("Event {} cannot be called without a SM instance").format(self))

Expand All @@ -140,6 +174,7 @@ def build_trigger(self, *args, machine: "StateChart", send_id: "str | None" = No
machine=machine,
event=self,
send_id=send_id,
invokeid=invokeid,
args=args,
kwargs=kwargs,
)
Expand Down
16 changes: 12 additions & 4 deletions statemachine/event_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class TriggerData:
Allow revoking a delayed :ref:`TriggerData` instance.
"""

invokeid: "str | None" = field(compare=False, default=None)
"""The invoke id of the child session that generated this event, if any."""

execution_time: float = field(default=0.0)
"""The time at which the :ref:`Event` should run."""

Expand Down Expand Up @@ -54,7 +57,7 @@ class EventData:
trigger_data: TriggerData
"""The :ref:`TriggerData` of the :ref:`event`."""

transition: "Transition"
transition: "Transition | None"
"""The :ref:`Transition` instance that was activated by the :ref:`Event`."""

state: "State" = field(init=False)
Expand All @@ -67,9 +70,14 @@ class EventData:
"""The destination :ref:`State` of the :ref:`transition`."""

def __post_init__(self):
self.state = self.transition.source
self.source = self.transition.source
self.target = self.transition.target
if self.transition is not None:
self.state = self.transition.source
self.source = self.transition.source
self.target = self.transition.target
else:
self.state = None # type: ignore[assignment]
self.source = None # type: ignore[assignment]
self.target = None # type: ignore[assignment]
self.machine = self.trigger_data.machine

@property
Expand Down
6 changes: 6 additions & 0 deletions statemachine/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ def add_from_attributes(cls, attrs): # noqa: C901
elif key.startswith("done_state_"):
suffix = key[len("done_state_") :]
event_id = f"{key} done.state.{suffix}"
elif key.startswith("done_invoke_"):
suffix = key[len("done_invoke_") :]
event_id = f"{key} done.invoke.{suffix}"
cls.add_event(event=Event(transitions=value, id=event_id, name=key))
elif isinstance(value, (Event,)):
if value._has_real_id:
Expand All @@ -264,6 +267,9 @@ def add_from_attributes(cls, attrs): # noqa: C901
elif key.startswith("done_state_"):
suffix = key[len("done_state_") :]
event_id = f"{key} done.state.{suffix}"
elif key.startswith("done_invoke_"):
suffix = key[len("done_invoke_") :]
event_id = f"{key} done.invoke.{suffix}"
else:
event_id = key
new_event = Event(
Expand Down
Loading