Skip to content

inbound

Inbound connectors send messages to Zeebe.

Upon a call, an inbound connector will start an async task with a coroutine defined in a run method and return None to Zeebe.

As soon as the async task completes, the connector will publish a message with the predefined name and correlation key.

Example:

import asyncio

from pydantic import Field

from python_camunda_sdk import InboundConnector


class SleepConnector(InboundConnector):
    duration: int = Field(description="Duration of sleep in seconds")

    async def run(self) -> bool:
        await asyncio.sleep(self.duration)
        return True

    class ConnectorConfig:
        name = "Sleep"
        type = 'sleep'

InboundConnector

Bases: Connector

Inbound connector base class.

Source code in python_camunda_sdk/connectors/inbound.py
class InboundConnector(Connector, base_config_cls=InboundConnectorConfig):
    """Inbound connector base class."""

    async def _execute(
        self,
        job: Job,
        client: ZeebeClient,
        correlation_key: str,
        message_name: str,
    ):
        variables = await super()._execute(job=job)
        await client.publish_message(
            name=message_name,
            correlation_key=correlation_key,
            variables=variables,
        )

    @classmethod
    def to_task(
        cls, client: ZeebeClient
    ) -> Coroutine[..., Optional[Union[BaseModel, SimpleTypes]]]:
        """Converts connector class into a pyzeebe task function.

        Returns:
            A coroutine that validates arguments and executes the connector
                logic.
        """

        async def task(
            job: Job, correlation_key: str, message_name: str, **kwargs
        ) -> Union[BaseModel, SimpleTypes]:
            try:
                connector = cls(correlation_key=correlation_key, **kwargs)
            except ValidationError as e:
                logger.exception(
                    "Failed to validate arguments for " f"{cls.config.name}"
                )
                raise e

            loop = asyncio.get_event_loop()
            loop.create_task(
                connector._execute(
                    job=job,
                    client=client,
                    correlation_key=correlation_key,
                    message_name=message_name,
                )
            )

        return task

to_task(client) classmethod

Converts connector class into a pyzeebe task function.

Returns:

Type Description
Coroutine[..., Optional[Union[BaseModel, SimpleTypes]]]

A coroutine that validates arguments and executes the connector logic.

Source code in python_camunda_sdk/connectors/inbound.py
@classmethod
def to_task(
    cls, client: ZeebeClient
) -> Coroutine[..., Optional[Union[BaseModel, SimpleTypes]]]:
    """Converts connector class into a pyzeebe task function.

    Returns:
        A coroutine that validates arguments and executes the connector
            logic.
    """

    async def task(
        job: Job, correlation_key: str, message_name: str, **kwargs
    ) -> Union[BaseModel, SimpleTypes]:
        try:
            connector = cls(correlation_key=correlation_key, **kwargs)
        except ValidationError as e:
            logger.exception(
                "Failed to validate arguments for " f"{cls.config.name}"
            )
            raise e

        loop = asyncio.get_event_loop()
        loop.create_task(
            connector._execute(
                job=job,
                client=client,
                correlation_key=correlation_key,
                message_name=message_name,
            )
        )

    return task

run() abstractmethod async

The main connector method that must be overridden by subclasses.

Source code in python_camunda_sdk/connectors/connector.py
@abstractmethod
async def run(self) -> None:
    """The main connector method that must be overridden by
    subclasses.
    """
    raise NotImplementedError