play with dataclass

This commit is contained in:
2026-01-30 14:52:23 +01:00
parent ac73bcde03
commit 381d06ae58

View File

@@ -19,6 +19,14 @@ from prometheus_client import (
) )
@dataclass
class RedfishSession:
"""Container for Redfish session data."""
token: str | None = None
loggout_url: str | None = None
vendor: str | None = None
@dataclass @dataclass
class HostConfig: class HostConfig:
"""Solve too many arguments""" """Solve too many arguments"""
@@ -27,11 +35,12 @@ class HostConfig:
username: str username: str
password: str password: str
chassis: list[str] | None = None chassis: list[str] | None = None
max_retries: int = 1 max_retries: int = 3 # 3 retires
backoff: int = 2 backoff: int = 2 # wait 2 seconds
cool_down: int = 120 # seconds to wait after too many failures cool_down: int = 120 # seconds to wait after too many failures
failures: int = 0 failures: int = 0
next_retry_time: float = field(default=0.0, init=False) next_retry_time: float = field(default=0.0, init=False)
session: RedfishSession = field(default_factory=RedfishSession)
# New attributes for Redfish stuff # New attributes for Redfish stuff
vendor: str | None = None vendor: str | None = None
@@ -96,8 +105,51 @@ async def process_request(t):
await asyncio.sleep(t) await asyncio.sleep(t)
async def probe_vendor(session, host: HostConfig) -> str | None:
"""Probe the vendor of the Redfish host."""
try:
async with session.get(
f"https://{host.fqdn}/redfish/v1/", ssl=False, timeout=10
) as resp:
if resp.status == 200:
data = await resp.json()
vendor = data.get("Vendor", "")
logging.debug("Detected vendor for %s: %s", host.fqdn, vendor)
return vendor
logging.warning(
"Vendor probe failed on %s: HTTP %s", host.fqdn, resp.status
)
except Exception as e:
logging.warning("Vendor probe failed for %s: %s", host.fqdn, e)
return None
async def login_hpe(session, host: HostConfig) -> bool:
"""Login to HPE Redfish API and set session token."""
login_url = f"https://{host.fqdn}/redfish/v1/SessionService/Sessions"
payload = {"UserName": host.username, "Password": host.password}
try:
async with session.post(login_url, json=payload, ssl=False, timeout=10) as login_resp:
if login_resp.status == 201:
host.session_token = login_resp.headers.get("X-Auth-Token")
host.session_logout = login_resp.headers.get("Location")
if not host.session.token or not host.session.logout_url:
raise RuntimeError("Invalid login response")
logging.info("New session token obtained for %s", host.fqdn)
return True
logging.warning(
"Login failed for %s: HTTP %s", host.fqdn, login_resp.status
)
except Exception as e:
logging.warning("Login failed for %s: %s", host.fqdn, e)
return False
async def fetch_with_retry(session, host: HostConfig, url: str) -> dict | None: async def fetch_with_retry(session, host: HostConfig, url: str) -> dict | None:
"""Fetch JSON from Redfish with retry/backoff""" """Fetch JSON from Redfish with retry/backoff."""
if host.should_skip(): if host.should_skip():
logging.warning( logging.warning(
"Skipping %s (in cool-down until %.1f)", host.fqdn, host.next_retry_time "Skipping %s (in cool-down until %.1f)", host.fqdn, host.next_retry_time
@@ -105,63 +157,24 @@ async def fetch_with_retry(session, host: HostConfig, url: str) -> dict | None:
UP_GAUGE.labels(host=host.fqdn).set(0) UP_GAUGE.labels(host=host.fqdn).set(0)
return None return None
if not host.vendor: # Probe vendor if not already known
try: if not host.session.vendor:
async with session.get( host.session.vendor = await probe_vendor(session, host)
f"https://{host.fqdn}/redfish/v1/", ssl=False, timeout=10
) as resp:
if resp.status == 200:
data = await resp.json()
host.vendor = data.get("Vendor", "")
logging.debug("Detected vendor for %s: %s", host.fqdn, host.vendor)
else:
logging.warning(
"Vendor probe failed on %s: HTTP %s", host.fqdn, resp.status
)
except Exception as e:
logging.warning("Vendor probe failed for %s: %s", host.fqdn, e)
is_hpe = host.vendor and host.vendor.strip().upper().startswith("HPE") is_hpe = host.session.vendor and host.session.vendor.strip().upper().startswith("HPE")
for attempt in range(1, host.max_retries + 1): for attempt in range(1, host.max_retries + 1):
try: try:
headers = {} headers = {}
if is_hpe: if is_hpe:
# Try to reuse existing session token # Handle HPE session token
if host.session_token: if not host.session.token:
headers["X-Auth-Token"] = host.session_token if not await login_hpe(session, host):
logging.debug("Reusing cached session token for %s", host.fqdn) # Retry login next attempt
else: continue
# Need to login and store new session token
# HPE Redfish login headers["X-Auth-Token"] = host.session.token
login_url = (
f"https://{host.fqdn}/redfish/v1/SessionService/Sessions"
)
payload = {"UserName": host.username, "Password": host.password}
async with session.post(
login_url, json=payload, ssl=False, timeout=10
) as login_resp:
if login_resp.status == 201:
host.session_token = login_resp.headers.get(
"X-Auth-Token"
) # as response in header
if not host.session_token:
raise RuntimeError("No X-Auth-Token in login response")
host.session_logout = login_resp.headers.get(
"Location"
) # as response in header
if not host.session_logout:
raise RuntimeError("No Location in login response")
headers["X-Auth-Token"] = host.session_token
logging.info("New session token obtained for %s", host.fqdn)
else:
logging.warning(
"Login failed for %s: HTTP %s",
host.fqdn,
login_resp.status,
)
continue # retry login next attempt
async with session.get( async with session.get(
url, headers=headers, ssl=False, timeout=10 url, headers=headers, ssl=False, timeout=10
@@ -174,7 +187,7 @@ async def fetch_with_retry(session, host: HostConfig, url: str) -> dict | None:
logging.warning( logging.warning(
"Invalid token for %s, reauthenticating...", host.fqdn "Invalid token for %s, reauthenticating...", host.fqdn
) )
host.session_token = None host.session.token = None
continue continue
logging.warning( logging.warning(
"HTTP %s from %s (attempt %d)", resp.status, host.fqdn, attempt "HTTP %s from %s (attempt %d)", resp.status, host.fqdn, attempt
@@ -524,15 +537,13 @@ async def get_system_info(session, host: HostConfig):
async def logout_host(session, host): async def logout_host(session, host):
"""Clean logout for Redfish with session tokens""" """Clean logout for Redfish with session tokens"""
if not host.session_token: if not host.session.token or not host.session.logout_url:
return
if not host.session_logout:
return return
try: try:
logout_url = f"{host.session_logout}" # the full URL is here! logout_url = host.session.logout_url
async with session.delete( async with session.delete(
logout_url, logout_url,
headers={"X-Auth-Token": host.session_token}, headers={"X-Auth-Token": host.session.token},
ssl=False, ssl=False,
timeout=5, timeout=5,
) as resp: ) as resp:
@@ -545,7 +556,8 @@ async def logout_host(session, host):
except Exception as e: except Exception as e:
logging.warning("Error during logout for %s: %s", host.fqdn, e) logging.warning("Error during logout for %s: %s", host.fqdn, e)
finally: finally:
host.session_token = None host.session.token = None
host.session.logout_url = None
async def run_exporter(config, stop_event): async def run_exporter(config, stop_event):