inbound
Inbound connectors send messages to Zeebe.
Upon a call, an inbound connector will start an async task with a coroutine defined in a
run
method and return None
to Zeebe.
As soon as the async task completes, the connector will publish a
message with the predefined name and correlation key.
Example:
| import asyncio
from pydantic import Field
from python_camunda_sdk import InboundConnector
class SleepConnector(InboundConnector):
duration: int = Field(description="Duration of sleep in seconds")
async def run(self) -> bool:
await asyncio.sleep(self.duration)
return True
class ConnectorConfig:
name = "Sleep"
type = 'sleep'
|
InboundConnector
Bases: Connector
Inbound connector base class.
Source code in python_camunda_sdk/connectors/inbound.py
| class InboundConnector(Connector, base_config_cls=InboundConnectorConfig):
"""Inbound connector base class."""
async def _execute(
self,
job: Job,
client: ZeebeClient,
correlation_key: str,
message_name: str,
):
variables = await super()._execute(job=job)
await client.publish_message(
name=message_name,
correlation_key=correlation_key,
variables=variables,
)
@classmethod
def to_task(
cls, client: ZeebeClient
) -> Coroutine[..., Optional[Union[BaseModel, SimpleTypes]]]:
"""Converts connector class into a pyzeebe task function.
Returns:
A coroutine that validates arguments and executes the connector
logic.
"""
async def task(
job: Job, correlation_key: str, message_name: str, **kwargs
) -> Union[BaseModel, SimpleTypes]:
try:
connector = cls(correlation_key=correlation_key, **kwargs)
except ValidationError as e:
logger.exception(
"Failed to validate arguments for " f"{cls.config.name}"
)
raise e
loop = asyncio.get_event_loop()
loop.create_task(
connector._execute(
job=job,
client=client,
correlation_key=correlation_key,
message_name=message_name,
)
)
return task
|
to_task(client)
classmethod
Converts connector class into a pyzeebe task function.
Returns:
Type |
Description |
Coroutine[..., Optional[Union[BaseModel, SimpleTypes]]]
|
A coroutine that validates arguments and executes the connector
logic.
|
Source code in python_camunda_sdk/connectors/inbound.py
| @classmethod
def to_task(
cls, client: ZeebeClient
) -> Coroutine[..., Optional[Union[BaseModel, SimpleTypes]]]:
"""Converts connector class into a pyzeebe task function.
Returns:
A coroutine that validates arguments and executes the connector
logic.
"""
async def task(
job: Job, correlation_key: str, message_name: str, **kwargs
) -> Union[BaseModel, SimpleTypes]:
try:
connector = cls(correlation_key=correlation_key, **kwargs)
except ValidationError as e:
logger.exception(
"Failed to validate arguments for " f"{cls.config.name}"
)
raise e
loop = asyncio.get_event_loop()
loop.create_task(
connector._execute(
job=job,
client=client,
correlation_key=correlation_key,
message_name=message_name,
)
)
return task
|
run()
abstractmethod
async
The main connector method that must be overridden by
subclasses.
Source code in python_camunda_sdk/connectors/connector.py
| @abstractmethod
async def run(self) -> None:
"""The main connector method that must be overridden by
subclasses.
"""
raise NotImplementedError
|