class ConnectorMetaclass(ModelMetaclass):
"""Connector metaclass.
Validates that connector definitions are correct.
"""
@logger.catch(reraise=True, message="Invalid connector definition")
def __new__(
mcs,
cls_name,
bases,
namespace,
base_config_cls=ConnectorConfig,
**kwargs,
):
cls = super().__new__(mcs, cls_name, bases, namespace, **kwargs)
cls._base_config_cls = base_config_cls
if bases != (BaseModel,) and bases != (Connector,):
cls._generate_config()
cls._check_run_method()
cls._check_return_annotation()
cls._extra_pre_init_checks()
return cls
def _generate_config(cls) -> None:
"""Converts ConnectorConfig class inside the connector class into a
`_config` attribute of type defined in `_base_config_cls`.
Raises:
AttributeError: if class does not have `ConnectorConfig` subclass
defined.
"""
config_cls = getattr(cls, "ConnectorConfig", None)
data = {}
if config_cls is None:
raise AttributeError(f"{cls} is missing ConnectorConfig")
for field_name, field in cls._base_config_cls.model_fields.items():
if not hasattr(config_cls, field_name):
continue
attr = getattr(config_cls, field_name)
data[field_name] = attr
cls.config = cls._base_config_cls(**data)
def _check_run_method(cls) -> None:
run_method = getattr(cls, "run", None)
if run_method is None or hasattr(run_method, "__isabstractmethod__"):
raise AttributeError(
"Connector must have a run(self, config) method defined."
f" {cls} appears to have no run() method"
)
def _check_return_annotation(cls) -> None:
signature = inspect.signature(cls.run)
return_annotation = signature.return_annotation
if return_annotation == signature.empty:
raise AttributeError(
"Connector that return nothing must be annotated with"
"-> None."
f" {cls} return nothing."
)
if (
return_annotation != NoneType
and not issubclass(return_annotation, BaseModel)
and return_annotation not in get_args(SimpleTypes)
):
raise AttributeError(
f"Connector returns invalid type ({return_annotation})"
)
cls._return_type = return_annotation
def _extra_pre_init_checks(cls) -> None:
pass