Source code for openbankproject_client.auth
"""
Authentication utilities for the OpenBankProject API Client.
"""
import base64
import json
import time
from typing import Dict, Optional
[docs]
class DirectLoginAuth:
"""
Authentication handler for DirectLogin authentication method.
"""
[docs]
def __init__(self, username: Optional[str] = None, password: Optional[str] = None,
consumer_key: Optional[str] = None, token: Optional[str] = None):
"""
Initialize DirectLogin authentication.
Args:
username: User's username (required if token not provided)
password: User's password (required if token not provided)
consumer_key: Consumer key for the application (required if token not provided)
token: Pre-generated DirectLogin token (optional)
"""
self.username = username
self.password = password
self.consumer_key = consumer_key
self.token = token
if not token and not (username and password and consumer_key):
raise ValueError("Either token or (username, password, consumer_key) must be provided")
[docs]
def get_headers(self) -> Dict[str, str]:
"""
Get authentication headers.
Returns:
Dict containing the Authorization header
"""
if self.token:
return {"Authorization": f"DirectLogin token={self.token}"}
else:
auth_string = f'DirectLogin username="{self.username}", password="{self.password}", consumer_key="{self.consumer_key}"'
return {"Authorization": auth_string}
[docs]
def authenticate(self, client) -> bool:
"""
Authenticate with the API and get a token if needed.
Args:
client: OpenBankProjectClient instance
Returns:
True if authentication was successful
"""
if self.token:
# Already have a token, no need to authenticate
return True
try:
# Make a request to get a token
# This is a placeholder - the actual endpoint may vary
response = client.request(
'GET',
'my/logins/direct',
headers=self.get_headers()
)
if 'token' in response:
self.token = response['token']
return True
return False
except Exception:
return False
[docs]
class GatewayLoginAuth:
"""
Authentication handler for GatewayLogin authentication method.
"""
[docs]
def __init__(self, token: str):
"""
Initialize GatewayLogin authentication.
Args:
token: GatewayLogin JWT token
"""
self.token = token
[docs]
def get_headers(self) -> Dict[str, str]:
"""
Get authentication headers.
Returns:
Dict containing the Authorization header
"""
return {"Authorization": f"GatewayLogin token={self.token}"}
[docs]
def authenticate(self, client) -> bool:
"""
Authenticate with the API.
Args:
client: OpenBankProjectClient instance
Returns:
True if authentication was successful
"""
# GatewayLogin uses a pre-generated token, so no authentication request is needed
return True
[docs]
class Authentication:
"""Authentication utilities for the OpenBankProject API."""
[docs]
@staticmethod
def create_direct_login_header(username: str, password: str, consumer_key: str) -> Dict[str, str]:
"""
Create a DirectLogin authorization header.
Args:
username: User's username
password: User's password
consumer_key: Consumer key for the application
Returns:
Dict containing the Authorization header
"""
auth_string = f'DirectLogin username="{username}", password="{password}", consumer_key="{consumer_key}"'
return {"Authorization": auth_string}
[docs]
@staticmethod
def create_gateway_login_header(jwt: str) -> Dict[str, str]:
"""
Create a GatewayLogin authorization header.
Args:
jwt: JSON Web Token for authentication
Returns:
Dict containing the Authorization header
"""
return {"Authorization": f"GatewayLogin token={jwt}"}
[docs]
@staticmethod
def create_token_header(token: str, auth_type: str = "DirectLogin") -> Dict[str, str]:
"""
Create an authorization header using a token.
Args:
token: Authentication token
auth_type: Authentication type (DirectLogin or GatewayLogin)
Returns:
Dict containing the Authorization header
"""
if auth_type not in ["DirectLogin", "GatewayLogin"]:
raise ValueError("auth_type must be either 'DirectLogin' or 'GatewayLogin'")
return {"Authorization": f"{auth_type} token={token}"}
[docs]
@staticmethod
def generate_jwt(consumer_key: str,
consumer_secret: str,
user_id: Optional[str] = None,
expiry_seconds: int = 3600) -> str:
"""
Generate a JWT token for GatewayLogin.
This is a simplified implementation and may need to be adjusted based on
the specific requirements of the OpenBankProject API.
Args:
consumer_key: Consumer key for the application
consumer_secret: Consumer secret for the application
user_id: Optional user ID to include in the token
expiry_seconds: Token expiry time in seconds (default: 1 hour)
Returns:
JWT token string
"""
# Create header
header = {
"alg": "HS256",
"typ": "JWT"
}
# Create payload
now = int(time.time())
payload = {
"iss": consumer_key,
"iat": now,
"exp": now + expiry_seconds
}
if user_id:
payload["sub"] = user_id
# Encode header and payload
header_encoded = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=')
payload_encoded = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=')
# Create signature
import hmac
import hashlib
signature_data = f"{header_encoded}.{payload_encoded}"
signature = hmac.new(
consumer_secret.encode(),
signature_data.encode(),
hashlib.sha256
).digest()
signature_encoded = base64.urlsafe_b64encode(signature).decode().rstrip('=')
# Combine to create JWT
return f"{header_encoded}.{payload_encoded}.{signature_encoded}"