Skip to content

Define an inbound connector

Implementation of inbound connectors is different from the official Camunda SDK.

Instead of automatically fetching Intermediate Catch Events from the Operate server, this runtime relies on a process instance activating the inbound connector explicitly using a service task.

Once activated, the runtime will create a new async task that will execute the connector logic and publish a message to Zeebe with the result.

Similar to outbound connectors, inbound connectors are defined as classes derived from InboundConnector.

import asyncio

from pydantic import Field

from python_camunda_sdk import InboundConnector


class SleepConnector(InboundConnector):
    duration: int = Field(description="Duration of sleep in seconds")

    async def run(self) -> bool:
        await asyncio.sleep(self.duration)
        return True

    class ConnectorConfig:
        name = "Sleep"
        type = 'sleep'
from .log import LogConnector
from .sleep import SleepConnector

Once started, this connector will sleep for a given duration and then publish a message to Zebee with a name and correlation key configured in Modeler.

Import the SleepConnector and add it to the runtime inbound_connectors.

from python_camunda_sdk import CamundaRuntime, InsecureConfig

from log import LogConnector
from sleep import SleepConnector


config = InsecureConfig(
    hostname="127.0.0.1"
)

runtime = CamundaRuntime(
    config=config,
    outbound_connectors=[LogConnector],
    inbound_connectors=[SleepConnector]
)

if __name__ == "__main__":
    runtime.start()
import asyncio

from pydantic import Field

from python_camunda_sdk import InboundConnector


class SleepConnector(InboundConnector):
    duration: int = Field(description="Duration of sleep in seconds")

    async def run(self) -> bool:
        await asyncio.sleep(self.duration)
        return True

    class ConnectorConfig:
        name = "Sleep"
        type = 'sleep'
from .log import LogConnector
from .sleep import SleepConnector

Restart the runtime.

$ python example/runtime.py
2023-08-03 21:09:30.829 | INFO     | python_camunda_sdk.runtime.runtime:main:114 - Loading LogConnector (log)
2023-08-03 21:09:30.829 | INFO     | python_camunda_sdk.runtime.runtime:main:114 - Loading Sleep (sleep)
2023-08-03 21:09:30.829 | INFO     | python_camunda_sdk.runtime.runtime:main:117 - Starting runtime

Now both LogConnector and SleepConnector are active.