|
3 | 3 | # SPDX-License-Identifier: GPL-3.0-or-later
|
4 | 4 |
|
5 | 5 | """
|
6 |
| -Client wrapper for initializing a connection to the openvasd HTTP API using optional mTLS authentication. |
| 6 | +http client for initializing a connection to the openvasd HTTP API using optional mTLS authentication. |
7 | 7 | """
|
8 | 8 |
|
9 | 9 | import ssl
|
| 10 | +from os import PathLike |
10 | 11 | from typing import Optional, Tuple, Union
|
11 | 12 |
|
12 | 13 | from httpx import Client
|
13 | 14 |
|
| 15 | +StrOrPathLike = Union[str, PathLike[str]] |
14 | 16 |
|
15 |
| -class OpenvasdClient: |
| 17 | + |
| 18 | +def create_openvasd_http_client( |
| 19 | + host_name: str, |
| 20 | + *, |
| 21 | + api_key: Optional[str] = None, |
| 22 | + server_ca_path: Optional[StrOrPathLike] = None, |
| 23 | + client_cert_paths: Optional[ |
| 24 | + Union[StrOrPathLike, Tuple[StrOrPathLike, StrOrPathLike]] |
| 25 | + ] = None, |
| 26 | + port: int = 3000, |
| 27 | +) -> Client: |
16 | 28 | """
|
17 |
| - The client wrapper around `httpx.Client` configured for mTLS-secured access or API KEY |
| 29 | + Create a `httpx.Client` configured for mTLS-secured or API KEY access |
18 | 30 | to an openvasd HTTP API instance.
|
19 |
| - """ |
20 |
| - |
21 |
| - def __init__( |
22 |
| - self, |
23 |
| - host_name: str, |
24 |
| - *, |
25 |
| - api_key: Optional[str] = None, |
26 |
| - server_ca_path: Optional[str] = None, |
27 |
| - client_cert_paths: Optional[Union[str, Tuple[str, str]]] = None, |
28 |
| - port: int = 3000, |
29 |
| - ): |
30 |
| - """ |
31 |
| - Initialize the OpenVASD HTTP client with optional mTLS and API key. |
32 | 31 |
|
33 |
| - Args: |
34 |
| - host_name: Hostname or IP of the OpenVASD server (e.g., "localhost"). |
35 |
| - api_key: Optional API key used for authentication via HTTP headers. |
36 |
| - server_ca_path: Path to the server's CA certificate (for verifying the server). |
37 |
| - client_cert_paths: Path to the client certificate (str) or a tuple of |
38 |
| - (cert_path, key_path) for mTLS authentication. |
39 |
| - port: The port to connect to (default: 3000). |
| 32 | + Args: |
| 33 | + host_name: Hostname or IP of the OpenVASD server (e.g., "localhost"). |
| 34 | + api_key: Optional API key used for authentication via HTTP headers. |
| 35 | + server_ca_path: Path to the server's CA certificate (for verifying the server). |
| 36 | + client_cert_paths: Path to the client certificate (str) or a tuple of |
| 37 | + (cert_path, key_path) for mTLS authentication. |
| 38 | + port: The port to connect to (default: 3000). |
40 | 39 |
|
41 |
| - Behavior: |
42 |
| - - If both `server_ca_path` and `client_cert_paths` are set, an mTLS connection |
43 |
| - is established using an SSLContext. |
44 |
| - - If not, `verify` is set to False (insecure), and HTTP is used instead of HTTPS. |
45 |
| - HTTP connection needs api_key for authorization. |
46 |
| - """ |
47 |
| - headers = {} |
| 40 | + Behavior: |
| 41 | + - If both `server_ca_path` and `client_cert_paths` are set, an mTLS connection |
| 42 | + is established using an SSLContext. |
| 43 | + - If not, `verify` is set to False (insecure), and HTTP is used instead of HTTPS. |
| 44 | + HTTP connection needs api_key for authorization. |
| 45 | + """ |
| 46 | + headers = {} |
48 | 47 |
|
49 |
| - context: Optional[ssl.SSLContext] = None |
| 48 | + context: Optional[ssl.SSLContext] = None |
50 | 49 |
|
51 |
| - # Prepare mTLS SSL context if needed |
52 |
| - if client_cert_paths and server_ca_path: |
53 |
| - context = ssl.create_default_context( |
54 |
| - ssl.Purpose.SERVER_AUTH, cafile=server_ca_path |
| 50 | + # Prepare mTLS SSL context if needed |
| 51 | + if client_cert_paths and server_ca_path: |
| 52 | + context = ssl.create_default_context( |
| 53 | + ssl.Purpose.SERVER_AUTH, cafile=server_ca_path |
| 54 | + ) |
| 55 | + if isinstance(client_cert_paths, tuple): |
| 56 | + context.load_cert_chain( |
| 57 | + certfile=client_cert_paths[0], keyfile=client_cert_paths[1] |
55 | 58 | )
|
56 |
| - if isinstance(client_cert_paths, tuple): |
57 |
| - context.load_cert_chain( |
58 |
| - certfile=client_cert_paths[0], keyfile=client_cert_paths[1] |
59 |
| - ) |
60 |
| - else: |
61 |
| - context.load_cert_chain(certfile=client_cert_paths) |
| 59 | + else: |
| 60 | + context.load_cert_chain(certfile=client_cert_paths) |
62 | 61 |
|
63 |
| - context.check_hostname = False |
64 |
| - context.verify_mode = ssl.CERT_REQUIRED |
| 62 | + context.check_hostname = False |
| 63 | + context.verify_mode = ssl.CERT_REQUIRED |
65 | 64 |
|
66 |
| - # Set verify based on context presence |
67 |
| - verify: Union[bool, ssl.SSLContext] = context if context else False |
| 65 | + # Set verify based on context presence |
| 66 | + verify: Union[bool, ssl.SSLContext] = context if context else False |
68 | 67 |
|
69 |
| - if api_key: |
70 |
| - headers["X-API-KEY"] = api_key |
| 68 | + if api_key: |
| 69 | + headers["X-API-KEY"] = api_key |
71 | 70 |
|
72 |
| - protocol = "https" if context else "http" |
73 |
| - base_url = f"{protocol}://{host_name}:{port}" |
| 71 | + protocol = "https" if context else "http" |
| 72 | + base_url = f"{protocol}://{host_name}:{port}" |
74 | 73 |
|
75 |
| - self.client = Client( |
76 |
| - base_url=base_url, |
77 |
| - headers=headers, |
78 |
| - verify=verify, |
79 |
| - http2=True, |
80 |
| - timeout=10.0, |
81 |
| - ) |
| 74 | + return Client( |
| 75 | + base_url=base_url, |
| 76 | + headers=headers, |
| 77 | + verify=verify, |
| 78 | + http2=True, |
| 79 | + timeout=10.0, |
| 80 | + ) |
0 commit comments