Source code for freeiam.ldap._wrapper
# SPDX-FileCopyrightText: 2025 Florian Best
# SPDX-License-Identifier: MIT OR Apache-2.0
"""Data wrapper."""
from dataclasses import dataclass
from typing import Any, Self, TypeAlias
import ldap.controls
from freeiam.ldap.attr import Attributes
from freeiam.ldap.constants import ResponseType
from freeiam.ldap.dn import DN
LDAPControl: TypeAlias = ldap.controls.LDAPControl | ldap.controls.RequestControl | ldap.controls.ResponseControl
LDAPControlList: TypeAlias = list[LDAPControl]
LDAPResponseControlList: TypeAlias = list[ldap.controls.ResponseControl]
LDAPRequestControlList: TypeAlias = list[ldap.controls.RequestControl]
[docs]
@dataclass
class Controls:
"""The LDAP request controls."""
server: LDAPRequestControlList | None = None
client: LDAPRequestControlList | None = None
response: LDAPResponseControlList | None = None
[docs]
def get(self, control: LDAPControl) -> ldap.controls.ResponseControl | None:
"""Get the control from the list of response controls."""
for ctrl in self.response or []:
if ctrl.controlType == control.controlType:
return ctrl
return None
[docs]
@classmethod
def expand(cls, controls: Self | None) -> dict[str, LDAPRequestControlList | None]:
if controls is None:
return {}
return {'serverctrls': controls.server, 'clientctrls': controls.client}
[docs]
@classmethod
def append_server(cls, controls: Self | None, control: ldap.controls.LDAPControl | ldap.controls.RequestControl) -> Self:
ctrls = cls([]) if controls is None else controls
assert ctrls.server is not None # noqa: S101
ctrls.server.append(control)
return ctrls
[docs]
@classmethod
def set_server(cls, controls: Self | None, control: ldap.controls.LDAPControl | ldap.controls.RequestControl) -> Self:
ctrls = cls([]) if controls is None else controls
assert ctrls.server is not None # noqa: S101
ctrls.server = [ctrl for ctrl in ctrls.server if ctrl.controlType != control.controlType] + [control]
return ctrls
@dataclass
class _Response:
"""The raw response of ldapobject.result4()."""
type: ResponseType | None
"""The response protocol operation."""
data: list[tuple[str, dict[str, list[bytes]]]] | None
"""The response data for the corresponding operation."""
msgid: int | None
"""The unique message ID."""
ctrls: LDAPResponseControlList | None
"""The list of python-ldap decoded response controls."""
name: str | None = None
"""The OID (responseName) of a extended operation response."""
value: bytes | None = None
"""The raw ASN.1 encoded reponseValue of an extended operation response."""
def __post_init__(self) -> None:
if not isinstance(self.type, ResponseType | None):
self.type = ResponseType(self.type)
@dataclass
class Page:
"""A page of a paginated search result."""
page: int
"""The current page number (starting at one)."""
entry: int
"""The number of the current entry on this page (starting at one)."""
page_size: int
"""The number of entries per page."""
results: int | None = None
"""The total number of search results."""
last_page: int | None = None
"""The last page of all search results."""
@property
def is_last_in_page(self) -> bool:
"""Whether this is the last entry on the current page."""
return self.page_size == self.entry
@dataclass
class Result:
"""The wrapped result of an operation. Allows accessing response controls."""
dn: DN | None
"""The new or unchanged DN of the object."""
attr: Attributes | None
"""The result LDAP attributes, if the operation provides some."""
controls: Controls | None
"""LDAP response controls."""
_response: _Response
"""The raw LDAP result."""
page: Page | None = None
"""The page of a paginated search result."""
extended_value: Any = None
"""The decoded response value of an extended response."""
@classmethod
def from_response(
cls, dn: DN | str | None, attr: dict[str, list[bytes]] | None, controls: Controls | None, response: _Response, **kwargs: Any
) -> Self:
dn = dn if dn is None else DN.get(dn)
attrs = attr if attr is None else Attributes(attr)
return cls(dn, attrs, cls._control_response(controls, response.ctrls), response, **kwargs)
@classmethod
def set_controls(cls, response: _Response, controls: Controls | None) -> None:
if controls is None:
return
controls.response = response.ctrls
@classmethod
def _control_response(cls, controls: Controls | None, response_ctrls: LDAPResponseControlList | None) -> Controls:
if controls is None:
return Controls(None, None, response_ctrls)
return Controls(controls.server and controls.server.copy(), controls.client and controls.client.copy(), response_ctrls)