Events
Event #
Bases: DictLikeModel
Base class for all workflow events.
Events are light-weight, serializable payloads passed between steps. They support both attribute and mapping access to dynamic fields.
Examples:
Subclassing with typed fields:
from pydantic import Field
class CustomEv(Event):
score: int = Field(ge=0)
e = CustomEv(score=10)
print(e.score)
Source code in workflows/events.py
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 | |
InputRequiredEvent #
Bases: Event
Emitted when human input is required to proceed.
Automatically written to the event stream if returned from a step.
If returned from a step, it does not need to be consumed by other steps and will pass validation. It's expected that the caller will respond to this event and send back a HumanResponseEvent.
Use this directly or subclass it.
Typical flow: a step returns InputRequiredEvent, callers consume it from
the stream and send back a HumanResponseEvent.
Examples:
from workflows.events import InputRequiredEvent, HumanResponseEvent
class HITLWorkflow(Workflow):
@step
async def my_step(self, ev: StartEvent) -> InputRequiredEvent:
return InputRequiredEvent(prefix="What's your name? ")
@step
async def my_step(self, ev: HumanResponseEvent) -> StopEvent:
return StopEvent(result=ev.response)
Source code in workflows/events.py
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 | |
HumanResponseEvent #
Bases: Event
Carries a human's response for a prior input request.
If consumed by a step and not returned by another, it will still pass validation.
Examples:
from workflows.events import InputRequiredEvent, HumanResponseEvent
class HITLWorkflow(Workflow):
@step
async def my_step(self, ev: StartEvent) -> InputRequiredEvent:
return InputRequiredEvent(prefix="What's your name? ")
@step
async def my_step(self, ev: HumanResponseEvent) -> StopEvent:
return StopEvent(result=ev.response)
Source code in workflows/events.py
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 | |
StartEvent #
Bases: Event
Implicit entry event sent to kick off a Workflow.run().
Source code in workflows/events.py
151 152 | |
StopEvent #
Bases: Event
Terminal event that signals the workflow has completed.
The result property contains the return value of the workflow run. When a
custom stop event subclass is used, the workflow result is that event
instance itself.
Examples:
# default stop event: result holds the value
return StopEvent(result={"answer": 42})
Subclassing to provide a custom result:
```python class MyStopEv(StopEvent): pass
@step async def my_step(self, ctx: Context, ev: StartEvent) -> MyStopEv: return MyStopEv(result={"answer": 42})
Source code in workflows/events.py
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 | |
WorkflowTimedOutEvent #
Bases: StopEvent
Published when a workflow exceeds its configured timeout.
This event is published to the event stream when a workflow times out, allowing consumers to understand why the workflow ended before the WorkflowTimeoutError exception is raised.
Attributes:
| Name | Type | Description |
|---|
Examples:
async for event in handler.stream_events():
if isinstance(event, WorkflowTimedOutEvent):
print(f"Workflow timed out after {event.timeout}s")
print(f"Active steps: {event.active_steps}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
|
required |
active_steps
|
list[str]
|
|
required |
Source code in workflows/events.py
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 | |
WorkflowCancelledEvent #
Bases: StopEvent
Published when a workflow is cancelled by the user.
This event is published to the event stream when a workflow is cancelled via the handler or programmatically, allowing consumers to understand why the workflow ended before the WorkflowCancelledByUser exception is raised.
Examples:
async for event in handler.stream_events():
if isinstance(event, WorkflowCancelledEvent):
print("Workflow was cancelled by user")
Source code in workflows/events.py
236 237 238 239 240 241 242 243 244 245 246 247 248 249 | |
WorkflowFailedEvent #
Bases: StopEvent
Published when a workflow step fails permanently.
This event is published to the event stream when a step fails and all retries are exhausted, allowing consumers to understand why the workflow ended before the exception is raised.
Attributes:
| Name | Type | Description |
|---|
Examples:
async for event in handler.stream_events():
if isinstance(event, WorkflowFailedEvent):
print(f"Step '{event.step_name}' failed after {event.attempts} attempts")
print(f"Total time: {event.elapsed_seconds:.2f}s")
print(event.traceback)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
step_name
|
str
|
|
required |
exception_type
|
str
|
|
required |
exception_message
|
str
|
|
required |
traceback
|
str
|
|
required |
attempts
|
int
|
|
required |
elapsed_seconds
|
float
|
|
required |
Source code in workflows/events.py
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 | |