Async Client für Azure DevOps Server

Stellt hier eure Projekte vor.
Internetseiten, Skripte, und alles andere bzgl. Python.
Antworten
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Hallo zusammen,

für einen Hobby-Projekt habe ich mir vorgenommen einen Client für die Azure DevOps Rest Api zu schreiben (jedenfalls soweit, dass man die wichtigsten Funktionen verwenden kann)
Es gibt bereits einen OpenSource Client von Microsoft, welcher auf dem msrest package - auch von Microsoft - basiert:
https://github.com/microsoft/azure-devops-python-api
https://github.com/Azure/msrest-for-python

Erstens gefiel mir die Umsetzung von Microsoft aber nicht wirklich und zweitens habe ich Zweifel, wie "async" das Ganze ist.
Das heißt natürlich bei weitem nicht, dass ich es besser könnte, ... aber wenn man es selbst mal versucht, kann man dabei ja was lernen...

Ein paar Anforderungen:
- Komplett async
- Der Client soll sich stark am Aufbau der API orientieren, mit der recht guten Dokumentation von Microsoft sollte die Benutzung des Clients dann ziemlich intuitiv sein.
- Die API - Dokumentation weicht teilweise von der Realität ab und die Anzahl der Parameter ist groß. Durch Type-Annotations soll die Benutzung vereinfacht werden.
- Alle empfangenen JSON Objekte sollen in Klassen deserialisiert werden.

Aktuell kann man den Aufbau schon erkennen.
Man kann Projekte anlegen, löschen und auflisten.
Das Hinzufügen weiterer Funktionen sollte eigentlich nur noch (viel) Fleißarbeit sein.

Ich frage mich aber ob man das Konzept noch verbessern kann. An einigen Stellen fand ich meine Umsetzung etwas umständlich. Bisher habe ich aber nichts besseres gefunden.
Für Feedback wäre ich sehr dankbar! Es ist etwas lang, daher schonmal Danke für's lesen!
(Dass sich alles in einem Modul befindet liegt nur daran, dass es leichter zu posten ist)

Code: Alles auswählen

"""
Async Client for the Azure DevOps Rest Api

Documentation:
https://docs.microsoft.com/en-us/rest/api/azure/devops/core/projects?view=azure-devops-server-rest-5.0
"""
import asyncio
import time
from functools import lru_cache
from dataclasses import dataclass, field
from enum import Enum
from types import TracebackType
from typing import List, Optional, Type
import aiohttp
from aiohttp.helpers import BasicAuth

USER = "roger"
PAT = "izgaygo5xldxwoms2xy447lqy7trhtauqlprsi7k6y3u4uyptd2a"
INSTANCE = "roger-pc"
COLLECTION = "DefaultCollection"
STANDRAD_CAPABILITIES = {
    "versioncontrol": {"sourceControlType": "Git"},
    "processTemplate": {"templateTypeId": "6b724908-ef14-45cf-84f8-768b5384da45"},
}


def snakify_str(string: str) -> str:
    if string.startswith("_"):
        string = string[1:]
    new_letters = []
    for letter in string:
        if letter.isupper():
            new_letters.append(f"_{letter.lower()}")
        else:
            new_letters.append(letter)
    return "".join(new_letters)


def snakify_args(arguments: dict) -> dict:
    """
    since the api returns fields in lower-camel-case
    they need to be converted to pythonic snake-case names
    """
    return {snakify_str(key): value for key, value in arguments.items()}


class ProjectState(Enum):
    all = "all"
    create_pending = "createPending"
    deleted = "deleted"
    deleting = "deleting"
    new = "new"
    unchanged = "unchanged"
    well_formed = "wellFormed"


class ProjectVisibility(Enum):
    private = "private"
    public = "public"


class OperationStatus(Enum):
    cancelled = "cancelled"
    failed = "failed"
    in_progress = "inProgress"
    not_set = "notSet"
    queued = "queued"
    succeeded = "succeeded"


@dataclass
class TeamProjectReference:
    id: str
    name: str
    url: str
    state: ProjectState
    revision: int
    visibility: ProjectVisibility
    last_update_time: str
    description: str = field(default=None)


@dataclass
class ReferenceLinks:
    links: dict


@dataclass
class WebApiTeamRef:
    id: str
    name: str
    url: str


@dataclass
class TeamProject:
    id: str
    name: str
    url: str
    state: ProjectState
    revision: int
    links: ReferenceLinks
    visibility: ProjectVisibility
    default_team: WebApiTeamRef
    last_update_time: str
    description: str
    capabilities: dict = field(default=None)


@dataclass
class OperationReference:
    id: str
    status: OperationStatus
    url: str


class BaseResource:
    """Baseclass for all Services and Resources"""

    def __init__(self) -> None:
        self._session = BaseResource.get_session()
        self._base_url = f"http://{INSTANCE}/{COLLECTION}/_apis"

    @staticmethod
    @lru_cache
    def get_session() -> aiohttp.ClientSession:
        return aiohttp.ClientSession(auth=BasicAuth(USER, PAT))

    async def __aenter__(self) -> "BaseResource":
        return self._session

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> None:
        await self._session.close()

    async def await_status(self, url: str, status: OperationStatus, timeout: int = None) -> None:
        start_time = time.time()
        while True:
            response = await self._session.get(url)
            content = await response.json()
            if content["status"] == status.value:
                return
            if timeout and time.time() - start_time > timeout:
                print("Timeout")
                return
            await asyncio.sleep(1)


class Projects(BaseResource):
    def __init__(self) -> None:
        super().__init__()
        self._resource_url = f"{self._base_url}/projects"

    async def list(
        self,
        skip: int = None,
        top: int = None,
        continuation_token: str = None,
        get_default_team_image_url: bool = None,
        state_filter: ProjectState = None,
    ) -> List[TeamProjectReference]:
        url = self._resource_url
        params = {
            "$skip": skip,
            "$top": top,
            "continuationToken": continuation_token,
            "getDefaultTeamImageUrl": get_default_team_image_url,
            "stateFilter": state_filter,
            "api_version": "5.0",
        }
        params = {key: value for key, value in params.items() if value is not None}
        async with self._session.get(url, params=params) as response:
            content = await response.json()
            return [TeamProjectReference(**snakify_args(element)) for element in content["value"]]

    async def get(self, project_id: str, include_capabilities: bool = None, include_history: bool = None) -> TeamProject:
        url = f"{self._resource_url}/{project_id}"
        params = {"includeCapabilities": include_capabilities, "includeHistory": include_history}
        async with self._session.get(url, params=params) as response:
            content = await response.json()
        return TeamProject(**snakify_args(content))

    async def create(self, name: str, capabilities: dict, description: str = None, url: str = None) -> None:
        url = self._resource_url
        params = {"api-version": "5.0"}
        payload = {"name": name, "description": description, "capabilities": capabilities}
        async with self._session.post(url, params=params, json=payload) as response:
            content = await response.json()
        operation_reference = OperationReference(**content)
        await self.await_status(operation_reference.url, OperationStatus.succeeded, timeout=30)

    async def delete(self, project_id: str) -> None:
        url = f"{self._resource_url}/{project_id}"
        params = {"api-version": "5.0"}
        async with self._session.delete(url, params=params) as response:
            content = await response.json()
        operation_reference = OperationReference(**content)
        await self.await_status(operation_reference.url, OperationStatus.succeeded, timeout=30)


class Core(BaseResource):
    def __init__(self) -> None:
        super().__init__()
        # Core resources
        self.projects = Projects()
        # self.processes = Processes()
        # self.teams = Teams()


class AsyncClient:
    """Client class to give access to all services and resources"""

    def __init__(self) -> None:
        # services
        # self.build = Build()
        self.core = Core()
        # self.dashboard = Dashboard()
        # ...


async def main():
    client = AsyncClient()

    for project in await client.core.projects.list():
        if project.name == "TestProject":
            await client.core.projects.delete(project.id)

    await client.core.projects.create(name="TestProject", capabilities=STANDRAD_CAPABILITIES, description="TestDescription")

    for project in await client.core.projects.list():
        if project.name == "TestProject":
            await client.core.projects.delete(project.id)


asyncio.run(main())
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Bei der ersten Version ist wurde die Session nicht geschlossen, Oops.

Dafür musste ich noch mal etwas umstellen.
Im Beispiel kann man jetzt 10 Projekte parallel anlegen, aber mein Server ist irgendwie falsch konfiguriert, so dass es trotzdem etwa eine Minute dauert bis alle angelegt und wieder gelöscht sind.

Code: Alles auswählen

"""
Async Azure DevOps Client

Microssoft Rest API documentation:
https://docs.microsoft.com/en-us/rest/api/azure/devops/core/projects?view=azure-devops-server-rest-5.0
"""
import asyncio
import time
from dataclasses import dataclass, field
from enum import Enum
from types import TracebackType
from typing import List, Optional, Type
import aiohttp
from aiohttp.helpers import BasicAuth

USER = "roger"
PAT = "izgaygo5xldxwoms2xy447lqy7trhtauqlprsi7k6y3u4uyptd2a"
INSTANCE = "roger-pc"
COLLECTION = "DefaultCollection"
STANDRAD_CAPABILITIES = {
    "versioncontrol": {"sourceControlType": "Git"},
    "processTemplate": {"templateTypeId": "6b724908-ef14-45cf-84f8-768b5384da45"},
}


def snakify_str(string: str) -> str:
    if string.startswith("_"):
        string = string[1:]
    new_letters = []
    for letter in string:
        if letter.isupper():
            new_letters.append(f"_{letter.lower()}")
        else:
            new_letters.append(letter)
    return "".join(new_letters)


def snakify_args(arguments: dict) -> dict:
    """
    since the api returns fields in lower-camel-case
    they need to be converted to pythonic snake-case names
    """
    return {snakify_str(key): value for key, value in arguments.items()}


class ProjectState(Enum):
    all = "all"
    create_pending = "createPending"
    deleted = "deleted"
    deleting = "deleting"
    new = "new"
    unchanged = "unchanged"
    well_formed = "wellFormed"


class ProjectVisibility(Enum):
    private = "private"
    public = "public"


class OperationStatus(Enum):
    cancelled = "cancelled"
    failed = "failed"
    in_progress = "inProgress"
    not_set = "notSet"
    queued = "queued"
    succeeded = "succeeded"


@dataclass
class TeamProjectReference:
    id: str
    name: str
    url: str
    state: ProjectState
    revision: int
    visibility: ProjectVisibility
    last_update_time: str
    description: str = field(default=None)


@dataclass
class ReferenceLinks:
    links: dict


@dataclass
class WebApiTeamRef:
    id: str
    name: str
    url: str


@dataclass
class TeamProject:
    id: str
    name: str
    url: str
    state: ProjectState
    revision: int
    links: ReferenceLinks
    visibility: ProjectVisibility
    default_team: WebApiTeamRef
    last_update_time: str
    description: str
    capabilities: dict = field(default=None)


@dataclass
class OperationReference:
    id: str
    status: OperationStatus
    url: str


async def await_status(session, url: str, status: OperationStatus, timeout: int = None) -> None:
    start_time = time.time()
    while True:
        response = await session.get(url)
        content = await response.json()
        if content["status"] == status.value:
            return
        if timeout and time.time() - start_time > timeout:
            print("Timeout")
            return
        await asyncio.sleep(1)


class Projects:
    def __init__(self, session) -> None:
        self._base_url = f"http://{INSTANCE}/{COLLECTION}/_apis"
        self._session = session
        self._resource_url = f"{self._base_url}/projects"

    async def list(
        self,
        skip: int = None,
        top: int = None,
        continuation_token: str = None,
        get_default_team_image_url: bool = None,
        state_filter: ProjectState = None,
    ) -> List[TeamProjectReference]:
        url = self._resource_url
        params = {
            "$skip": skip,
            "$top": top,
            "continuationToken": continuation_token,
            "getDefaultTeamImageUrl": get_default_team_image_url,
            "stateFilter": state_filter,
            "api_version": "5.0",
        }
        params = {key: value for key, value in params.items() if value is not None}
        async with self._session.get(url, params=params) as response:
            content = await response.json()
            return [TeamProjectReference(**snakify_args(element)) for element in content["value"]]

    async def get(self, project_id: str, include_capabilities: bool = None, include_history: bool = None) -> TeamProject:
        url = f"{self._resource_url}/{project_id}"
        params = {"includeCapabilities": include_capabilities, "includeHistory": include_history}
        async with self._session.get(url, params=params) as response:
            content = await response.json()
        return TeamProject(**snakify_args(content))

    async def create(self, name: str, capabilities: dict, description: str = None, url: str = None) -> None:
        url = self._resource_url
        params = {"api-version": "5.0"}
        payload = {"name": name, "description": description, "capabilities": capabilities}
        async with self._session.post(url, params=params, json=payload) as response:
            content = await response.json()
        operation_reference = OperationReference(**content)
        await await_status(self._session, operation_reference.url, OperationStatus.succeeded, timeout=120)

    async def delete(self, project_id: str) -> None:
        url = f"{self._resource_url}/{project_id}"
        params = {"api-version": "5.0"}
        async with self._session.delete(url, params=params) as response:
            content = await response.json()
        operation_reference = OperationReference(**content)
        await await_status(self._session, operation_reference.url, OperationStatus.succeeded, timeout=120)


class Core:
    def __init__(self, session) -> None:
        self._base_url = f"http://{INSTANCE}/{COLLECTION}/_apis"
        # Core resources
        self._session = session
        self.projects = Projects(self._session)
        # self.processes = Processes()
        # self.teams = Teams()


class AsyncClient:
    """Client class to give access to all services and resources"""

    def __init__(self) -> None:
        self._session = aiohttp.ClientSession(auth=BasicAuth(USER, PAT))
        # services
        # self.build = Build()
        self.core = Core(self._session)
        # self.dashboard = Dashboard()
        # ...

    async def close(self) -> None:
        await self._session.close()

    async def __aenter__(self) -> aiohttp.ClientSession:
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> None:
        await self.close()


async def main():
    async with AsyncClient() as client:

        for project in await client.core.projects.list():
            if "TestProject" in project.name:
                await client.core.projects.delete(project.id)

        await asyncio.gather(
            *[
                client.core.projects.create(name=f"TestProject_{num}", capabilities=STANDRAD_CAPABILITIES, description=f"TestDescription_{num}")
                for num in range(10)
            ]
        )

        for project in await client.core.projects.list():
            if "TestProject" in project.name:
                await client.core.projects.delete(project.id)


asyncio.run(main())
imonbln
User
Beiträge: 149
Registriert: Freitag 3. Dezember 2021, 17:07

Code: Alles auswählen


USER = "roger"
PAT = "izgaygo5xldxwoms2xy447lqy7trhtauqlprsi7k6y3u4uyptd2a"
INSTANCE = "roger-pc"
COLLECTION = "DefaultCollection"
STANDRAD_CAPABILITIES = {
    "versioncontrol": {"sourceControlType": "Git"},
    "processTemplate": {"templateTypeId": "6b724908-ef14-45cf-84f8-768b5384da45"},
}

Nur so aus Neugier. Du bist dir sicher, dass du hier nicht aus Versehen ein Credential veröffentlicht hast, das Zugriff auf deine Instanzen gewährt.

Daten sollte man immer vom Code trennen, also zum Beispiel, indem man die Daten in eine Konfigurationsdatei auslagert.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

imonbln hat geschrieben: Montag 13. Dezember 2021, 09:32 Nur so aus Neugier. Du bist dir sicher, dass du hier nicht aus Versehen ein Credential veröffentlicht hast, das Zugriff auf deine Instanzen gewährt.

Daten sollte man immer vom Code trennen, also zum Beispiel, indem man die Daten in eine Konfigurationsdatei auslagert.
Danke, aber daran habe ich natürlich gedacht. Der Server ist nicht öffentlich zugänglich.
Die Zugangsdaten würden in der Realität unabhängig vom Code sein.
Ich wollte es zur Demonstration nur mal hier mit einfügen.

Im Moment bastle ich daran wie ich die Klassen aufbaue, damit die Benutzung möglichst intuitiv ist.
Außerdem will ich Code-Wiederholungen reduzieren, denn die API ist recht umfangreich. Es kann sonst passieren, dass am Ende zigfach sehr ähnlicher Code entsteht.
Ich befürchte, ich habe auch noch Fehler in der Behandlung der aiohttp.ClientSession, die sich aktuell nicht bemerkbar machen, da der Server so langsam reagiert.
DasIch
User
Beiträge: 2718
Registriert: Montag 19. Mai 2008, 04:21
Wohnort: Berlin

Ich frage mich aber ob man das Konzept noch verbessern kann.
Da fallen ein paar Probleme auf:
  • Es gibt keinen sinnvollen weg um dass ganze zu Konfigurieren.
  • Timeouts sind nicht festgelegt (basierend auf SLOs/SLAs) noch sind diese konfigurierbar
  • Es gibt kein retry mit exponential backoff
  • Es fehlt logging oder distributed tracing (via OpenTelemetry oder OpenTracing)
Die letzten drei Punkte sind für Skripte nicht so interessant aber im Kontext einer Anwendung würde man die schon haben wollen.
Benutzeravatar
__blackjack__
User
Beiträge: 13071
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Bei `STANDRAD_CAPABILITIES` könnte man jetzt fragen ob ein Standrad ein Fahrrad ist das nur rumsteht. 🤣 SCNR.

`snakify_str()` liesse sich mit einem Generatorausdruck für den `join()`-Aufruf kompakter schreiben.

Die Namen bei den `Enum`-Objekten sind Konstanten und sollten KOMPLETT_GROSS geschrieben werden. Und werden die eigentlich irgendwo sinnvoll konvertiert? Ich habe beim kurzen drüber schauen nichts entdecken können.

Womit testest Du denn die Typannotationen? Wenn man die verwendet, sollte man auch dafür sorgen, dass die korrekt sind, denn die versprechen dem Leser deutlich stärker als Kommentare was für einen Typ etwas haben *sollte*. `TeamProjectReference` ist *vielleicht* `str`, denn der `field()`-`default`-Wert sagt, das ist optional. Analog `TeamProject.capabilities`.

Bei `AsyncClient.__aenter__` ist die Typannotation falsch. `self` ist nicht vom Typ `ClientSession`.

Einfach `dict` als Typannotationen zu verwenden, wenn man mehr über den Inhalt weiss ist ein bisschen ”lazy”. Da geht mehr. Auf der anderen Seite ist das auch unnötig einschränkend an Stellen wo man mehr verarbeiten könnte als konkrete `dict`\s. `snakify_args()` will beispielsweise ein `Mapping[str, Any]` als Argument und liefert `Dict[str, Any]` als Ergebnis.

``in`` zu verwenden, wenn man eigentlich `startswith()` meint, kann blöde Konsequenzen haben. Man stelle sich mal vor jemand legt ein "UnitTestProject" an das kein Wegwerftestprojekt ist, und das dann einfach so gelöscht wird, weil "TestProject" drin vor kommt. 😱

Ungetestet:

Code: Alles auswählen

#!/usr/bin/env python3
"""
Async Azure DevOps Client

Microssoft Rest API documentation:
https://docs.microsoft.com/en-us/rest/api/azure/devops/core/projects?view=azure-devops-server-rest-5.0
"""
from __future__ import annotations

import asyncio
import time
from dataclasses import dataclass, field
from enum import Enum
from types import TracebackType
from typing import Any, Dict, List, Mapping, Optional, Type

import aiohttp
from aiohttp.helpers import BasicAuth

USER = "roger"
PAT = "izgaygo5xldxwoms2xy447lqy7trhtauqlprsi7k6y3u4uyptd2a"
INSTANCE = "roger-pc"
COLLECTION = "DefaultCollection"
STANDARD_CAPABILITIES = {
    "versioncontrol": {"sourceControlType": "Git"},
    "processTemplate": {
        "templateTypeId": "6b724908-ef14-45cf-84f8-768b5384da45"
    },
}


def snakify_str(string: str) -> str:
    if string.startswith("_"):
        string = string[1:]
    return "".join(
        f"_{character.lower()}" if character.isupper() else character
        for character in string
    )


def snakify_args(arguments: Mapping[str, Any]) -> Dict[str, Any]:
    """
    since the api returns fields in lower-camel-case
    they need to be converted to pythonic snake-case names
    """
    return {snakify_str(key): value for key, value in arguments.items()}


class ProjectState(Enum):
    ALL = "all"
    CREATE_PENDING = "createPending"
    DELETED = "deleted"
    DELETING = "deleting"
    NEW = "new"
    UNCHANGED = "unchanged"
    WELL_FORMED = "wellFormed"


class ProjectVisibility(Enum):
    PRIVATE = "private"
    PUBLIC = "public"


class OperationStatus(Enum):
    CANCELLED = "cancelled"
    FAILED = "failed"
    IN_PROGRESS = "inProgress"
    NOT_SET = "notSet"
    QUEUED = "queued"
    SUCCEEDED = "succeeded"


@dataclass
class TeamProjectReference:
    id: str
    name: str
    url: str
    state: ProjectState
    revision: int
    visibility: ProjectVisibility
    last_update_time: str
    description: Optional[str] = field(default=None)


@dataclass
class ReferenceLinks:
    links: Mapping


@dataclass
class WebApiTeamRef:
    id: str
    name: str
    url: str


@dataclass
class TeamProject:
    id: str
    name: str
    url: str
    state: ProjectState
    revision: int
    links: ReferenceLinks
    visibility: ProjectVisibility
    default_team: WebApiTeamRef
    last_update_time: str
    description: str
    capabilities: Optional[Mapping] = field(default=None)


@dataclass
class OperationReference:
    id: str
    status: OperationStatus
    url: str


async def await_status(
    session, url: str, status: OperationStatus, timeout: int = None
) -> None:
    start_time = time.time()
    while True:
        response = await session.get(url)
        content = await response.json()
        if content["status"] == status.value:
            return
        if timeout and time.time() - start_time > timeout:
            print("Timeout")
            return
        await asyncio.sleep(1)


class Projects:
    def __init__(self, session) -> None:
        self._base_url = f"http://{INSTANCE}/{COLLECTION}/_apis"
        self._session = session
        self._resource_url = f"{self._base_url}/projects"

    async def list(
        self,
        skip: int = None,
        top: int = None,
        continuation_token: str = None,
        get_default_team_image_url: bool = None,
        state_filter: ProjectState = None,
    ) -> List[TeamProjectReference]:
        url = self._resource_url
        params = {
            "$skip": skip,
            "$top": top,
            "continuationToken": continuation_token,
            "getDefaultTeamImageUrl": get_default_team_image_url,
            "stateFilter": state_filter,
            "api_version": "5.0",
        }
        params = {
            key: value for key, value in params.items() if value is not None
        }
        async with self._session.get(url, params=params) as response:
            content = await response.json()
            return [
                TeamProjectReference(**snakify_args(element))
                for element in content["value"]
            ]

    async def get(
        self,
        project_id: str,
        include_capabilities: bool = None,
        include_history: bool = None,
    ) -> TeamProject:
        url = f"{self._resource_url}/{project_id}"
        params = {
            "includeCapabilities": include_capabilities,
            "includeHistory": include_history,
        }
        async with self._session.get(url, params=params) as response:
            content = await response.json()
        return TeamProject(**snakify_args(content))

    async def create(
        self,
        name: str,
        capabilities: Mapping,
        description: str = None,
        url: str = None,
    ) -> None:
        url = self._resource_url
        params = {"api-version": "5.0"}
        payload = {
            "name": name,
            "description": description,
            "capabilities": capabilities,
        }
        async with self._session.post(
            url, params=params, json=payload
        ) as response:
            content = await response.json()
        operation_reference = OperationReference(**content)
        await await_status(
            self._session,
            operation_reference.url,
            OperationStatus.SUCCEEDED,
            timeout=120,
        )

    async def delete(self, project_id: str) -> None:
        url = f"{self._resource_url}/{project_id}"
        params = {"api-version": "5.0"}
        async with self._session.delete(url, params=params) as response:
            content = await response.json()
        operation_reference = OperationReference(**content)
        await await_status(
            self._session,
            operation_reference.url,
            OperationStatus.SUCCEEDED,
            timeout=120,
        )


class Core:
    def __init__(self, session) -> None:
        self._base_url = f"http://{INSTANCE}/{COLLECTION}/_apis"
        # Core resources
        self._session = session
        self.projects = Projects(self._session)
        # self.processes = Processes()
        # self.teams = Teams()


class AsyncClient:
    """Client class to give access to all services and resources"""

    def __init__(self) -> None:
        self._session = aiohttp.ClientSession(auth=BasicAuth(USER, PAT))
        # services
        # self.build = Build()
        self.core = Core(self._session)
        # self.dashboard = Dashboard()
        # ...

    async def close(self) -> None:
        await self._session.close()

    async def __aenter__(self) -> AsyncClient:
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> None:
        await self.close()


async def delete_test_projects(client: AsyncClient) -> None:
    for project in await client.core.projects.list():
        if project.name.startswith("TestProject_"):
            await client.core.projects.delete(project.id)


async def main() -> None:
    async with AsyncClient() as client:

        await delete_test_projects(client)

        await asyncio.gather(
            *[
                client.core.projects.create(
                    name=f"TestProject_{number}",
                    capabilities=STANDARD_CAPABILITIES,
                    description=f"TestDescription_{number}",
                )
                for number in range(10)
            ]
        )

        await delete_test_projects(client)


if __name__ == "__main__":
    asyncio.run(main())
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

DasIch hat geschrieben: Montag 13. Dezember 2021, 10:48
  1. Es gibt keinen sinnvollen weg um dass ganze zu Konfigurieren.
  2. Timeouts sind nicht festgelegt (basierend auf SLOs/SLAs) noch sind diese konfigurierbar
  3. Es gibt kein retry mit exponential backoff
  4. Es fehlt logging oder distributed tracing (via OpenTelemetry oder OpenTracing)
Danke für dein Feedback.

zu 1.: Aktuell weiß ich noch nicht welche Einstellungen überhaupt nötig sein werden. Daher reichen mir jetzt noch die Konstanten im Modul. Auf Dauer könnte ich mir dafür eine Klasse vorstellen, die alle nötigen Einstellungen enthält.
Es sollte Servereinstellungen, Benutzereinstellungen (Credentials) und weitere ?? geben.
zu 2.: Die ClientSession von aiohttp nimmt einen Timeout als Übergabeparameter an. Default ist 5 min. Bisher gab es keine Notwendigkeit das zu ändern. Ich weiß auch noch nicht wie auf Verbindungsfehler reagiert werden soll. Es kommt darauf an wofür man den Client nutzen möchte.
zu 3.: Das steht ehrlich gesagt, nicht hoch auf meiner Prioritätenliste. Hohe Verfügbarkeit ist für mein Hobbyprojekt erstmal nebensächlich.
zu 4.: Ja logging muss sicher bald rein, allein schon zur Fehlersuche. Asyncio verwendet wohl einen eigenen Logger den man nutzen kann. Zusätzlich brauche ich aber noch weitere Logging Funktionen.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@__blackjack__,

Danke dir auch für die hilfreichen Punkte.
`snakify_str()` liesse sich mit einem Generatorausdruck für den `join()`-Aufruf kompakter schreiben.
Ja, da habe ich zu kompliziert gedacht. Ich fand die Funktion etwas gehacked, aber mir ist nichts besseres eingefallen um aus dem camelCase JSON, snake_case Python zu machen.
Die Namen bei den `Enum`-Objekten sind Konstanten und sollten KOMPLETT_GROSS geschrieben werden. Und werden die eigentlich irgendwo sinnvoll konvertiert?
Anscheinend auch ein Denkfehler. Auch da hatte ich gedacht, dass ich die Schreibweise wichtig für die Deserialisierung ist. Ich möchte Enums verwenden, da ich mir die Parameternamen sonst nicht merken kann.
Man kann zum Beispiel beim Listen der Projekte einen State-Filter übergeben:

Code: Alles auswählen

await client.core.projects.list(state_filter=ProjectState.WELL_FORMED.value)
So sind die möglichen ProjectStates klar erkennbar. Aber dass man da bis auf value runtergehen muss um an den entsprechenden String zu kommen ist etwas unschön.

Was die Typeannotationen betrifft, war ich mir nicht immer sicher. Deine Kommentare machen aber Sinn.

Bei AsyncClient Klasse war ich auch sehr unentschlossen wie es umzusetzen ist. Daher wohl der Fehler mit dem falschen Rückgabetyp.
Aktuell wird ja in der Klasse die aiohttp.ClientSession angelegt. Die muss dann aber leider an alle anderen Klassen die einen Service bereitstellen weitergereicht werden, damit sie die Session verwenden können.
Eigentlich wollte ich eine Basis-Klasse anlegen, in der nur einmal die ClientSession erstellt wird. Alle anderen Klassen könnten dann durch Vererbung auf die Session zugreifen. Das habe ich bisher noch nicht hinbekommen, da nicht mit der Async-Vorgehensweise passt, für mich jedenfalls noch nicht.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

So, ich habe ein paar (hoffentlich) Verbesserungen eingebaut:
1) Logging:
Es gibt auf Module-Ebene einen Logger:

Code: Alles auswählen

logging.basicConfig(format="%(asctime)s:%(levelname)s:%(message)s", level=logging.INFO)
LOG = logging.getLogger(__name__)

class AsyncClient:
    """Client class to give access to all services and resources"""

    def __init__(self, connection, credentials, debug=False) -> None:
        if debug:
            LOG.setLevel(logging.DEBUG)
        else:
            LOG.setLevel(logging.INFO)
        LOG.debug("Connecting client session")
        self._session = aiohttp.ClientSession(auth=BasicAuth(credentials.username, credentials.pat))
        self._core = Core(self._session, connection)
    ...
Ich bin mir aber nicht sicher, ob das eine gute Idee ist, irgendwie ist passt es nicht dass die Klasse den globalen Logger konfigurieren darf.

2) Credentials und Connection Parameter in zwei Klassen zur besseren Konfiguration.

Code: Alles auswählen

@dataclass(frozen=True)
class Credentials:
    username: str
    pat: str


@dataclass(frozen=True)
class Connection:
    instance: str
    collection: str
3) Workaround für asyncio-Problem unter Windows:

Code: Alles auswählen

if __name__ == "__main__":
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    asyncio.run(main(), debug=True)
Dies muss noch abhängig vom verwendeten OS gesetzt werden.

4. Basis-Klasse für die Enums, damit der Zugriff "ProjectState.ALL" nur "all" zurückgibt

Code: Alles auswählen

class StrEnum(Enum):
    def __repr__(self) -> str:
        return self.value


class ProjectState(StrEnum):
    ALL = "all"
    CREATE_PENDING = "createPending"
    DELETED = "deleted"
    DELETING = "deleting"
    NEW = "new"
    UNCHANGED = "unchanged"
    WELL_FORMED = "wellFormed"
__deets__
User
Beiträge: 14523
Registriert: Mittwoch 14. Oktober 2015, 14:29

Die Konfiguration des loggers gehört da nicht hin. Sinn des logging Moduls ist es, zentral alle Quellen konfigurieren zu können. Ohne das alle möglichen Schichten für sich & ihre Abhängigkeiten dafür Parameter durchschleifen.
Benutzeravatar
__blackjack__
User
Beiträge: 13071
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@rogerb: Vor allem sollte man dafür sorgen, dass Bibliotheken nicht ungefragt protokollieren. Also Logger benutzen schon, aber die Konfiguration sollte der Benutzer der Bibliothek in der Hand haben. Die Bibliothek sollte am besten einen NullHandler setzen. Und auf keinen Fall `basicConfig()` aufrufen.

Die `__repr__()` ist keine gute Idee. Das kann man für `__str__()` machen, aber bei `__repr__()` sollte man sich an die üblichen Konventionen halten: Entweder gibt das was aus was man in einen Python-Quelltext kopieren kann und dass dann wieder so ein Objekt ergibt, oder es ist in ”spitze Klammern” eingefasst.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Hmm, okay, ich weiß für den __repr__ gibt es die Konvention. Ich hätte gerne, dass beim Zugriff auf ein Enum nur der zugeordnete string zurück kommt. Die spitzen Klammern versteht die API ja nicht und __str__ hilft mir nur bei der Ausgabe. Dafür sind die Enums aber in meinem Fall nicht vorgesehen.
Ich habs zurück gebaut und muss mal überlegen, ob ich eine andere Lösung finde.

Der Hinweis zu logging war sehr hilfreich. Ich hatte da ein falsches Verständnis.
Jetzt habe ich ein Modul angelegt in dem ein Logger mit einem eigenen Namen angelegt wird,

Code: Alles auswählen

import logging

logger = logging.getLogger(__package__)
welchen ich dann in anderen Modulen importieren und verwenden kann.

Als Anwender habe ich dann die Möglichkeit mir diesen Logger zu holen und die entsprechenden Handler dafür zu konfigurieren.
Jeder Anwender kann das auf seine Art umkonfigurieren.

In meiner main.py sieht das so aus:

Code: Alles auswählen

def setup_logging():
    formatter = logging.Formatter("%(asctime)s:%(levelname)s:%(name)s:%(message)s")
    file_handler = logging.FileHandler("adosclient.log")
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(formatter)
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.WARNING)
    console_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    logger.setLevel(logging.DEBUG)


if __name__ == "__main__":
    setup_logging()
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    asyncio.run(main())
Antworten