Skip to content

outbound

Outbound connectors handle tasks from Zeebe.

Example:

from pydantic import BaseModel, Field
from loguru import logger

from python_camunda_sdk import OutboundConnector


class StatusModel(BaseModel):
    status: str


class LogConnector(OutboundConnector):
    message: str = Field(description="Message to log")

    async def run(self) -> StatusModel:
        logger.info(f"LogConnector: {self.message}")

        return StatusModel(status="ok")

    class ConnectorConfig:
        name = "LogConnector"
        type = "log"

OutboundConnector

Bases: Connector

Base class for outbound connectors.

Source code in python_camunda_sdk/connectors/outbound.py
class OutboundConnector(Connector, base_config_cls=OutboundConnectorConfig):
    """Base class for outbound connectors."""

    @classmethod
    def to_task(
        cls, client: ZeebeClient
    ) -> Coroutine[..., Optional[Union[BaseModel, SimpleTypes]]]:
        """Converts connector class into a pyzeebe task function.

        Returns:
            A coroutine that validates arguments and executes the connector
                logic.
        """

        async def task(job: Job, **kwargs) -> Union[BaseModel, SimpleTypes]:
            try:
                connector = cls(**kwargs)
            except ValidationError as e:
                logger.exception(
                    "Failed to validate arguments for " f"{cls.config.name}"
                )
                raise e

            ret = await connector._execute(job=job)
            return ret

        return task

to_task(client) classmethod

Converts connector class into a pyzeebe task function.

Returns:

Type Description
Coroutine[..., Optional[Union[BaseModel, SimpleTypes]]]

A coroutine that validates arguments and executes the connector logic.

Source code in python_camunda_sdk/connectors/outbound.py
@classmethod
def to_task(
    cls, client: ZeebeClient
) -> Coroutine[..., Optional[Union[BaseModel, SimpleTypes]]]:
    """Converts connector class into a pyzeebe task function.

    Returns:
        A coroutine that validates arguments and executes the connector
            logic.
    """

    async def task(job: Job, **kwargs) -> Union[BaseModel, SimpleTypes]:
        try:
            connector = cls(**kwargs)
        except ValidationError as e:
            logger.exception(
                "Failed to validate arguments for " f"{cls.config.name}"
            )
            raise e

        ret = await connector._execute(job=job)
        return ret

    return task

run() abstractmethod async

The main connector method that must be overridden by subclasses.

Source code in python_camunda_sdk/connectors/connector.py
@abstractmethod
async def run(self) -> None:
    """The main connector method that must be overridden by
    subclasses.
    """
    raise NotImplementedError