# coding=utf-8
import logging
import uuid
from dataclasses import dataclass
from homematicip.connection.connection_context import ConnectionContext
from homematicip.connection.rest_connection import RestResult, RestConnection
LOGGER = logging.getLogger(__name__)
[docs]
@dataclass
class Auth:
"""This class generates the auth token for the homematic ip access point."""
client_id: str = str(uuid.uuid4())
header: dict = None
accesspoint_id: str = None
pin: str = None
connection: RestConnection = None
def __init__(self, connection: RestConnection, client_auth_token: str, accesspoint_id: str):
"""Initialize the auth object.
@param connection: The connection object
@param client_auth_token: The client auth token
@param accesspoint_id: The access point id
"""
LOGGER.debug("Initialize new Auth")
self.accesspoint_id = accesspoint_id
self.connection = connection
self.headers = {
"content-type": "application/json",
"accept": "application/json",
"VERSION": "12",
"CLIENTAUTH": client_auth_token,
"ACCESSPOINT-ID": self.accesspoint_id,
}
[docs]
def set_pin(self, pin: str):
"""Set the pin for the auth object.
@param pin: The pin"""
self.pin = pin
[docs]
async def connection_request(self, access_point: str, device_name="homematicip-python") -> RestResult:
LOGGER.debug(f"Requesting connection for access point {access_point}")
headers = self.headers
if self.pin is not None:
headers["PIN"] = self.pin
data = {
"deviceId": self.client_id,
"deviceName": device_name,
"sgtin": access_point
}
return await self.connection.async_post("auth/connectionRequest", data, headers)
[docs]
async def is_request_acknowledged(self) -> bool:
LOGGER.debug("Checking if request is acknowledged")
data = {
"deviceId": self.client_id,
"accessPointId": self.accesspoint_id
}
result = await self.connection.async_post("auth/isRequestAcknowledged", data, self.headers)
LOGGER.debug(f"Request acknowledged result: {result}")
return result.status == 200
[docs]
async def request_auth_token(self) -> str:
"""Request an auth token from the access point.
@return: The auth token"""
LOGGER.debug("Requesting auth token")
data = {"deviceId": self.client_id}
result = await self.connection.async_post("auth/requestAuthToken", data, self.headers)
LOGGER.debug(f"Request auth token result: {result}")
return result.json["authToken"]
[docs]
async def confirm_auth_token(self, auth_token: str) -> str:
"""Confirm the auth token and get the client id.
@param auth_token: The auth token
@return: The client id"""
LOGGER.debug("Confirming auth token")
data = {"deviceId": self.client_id, "authToken": auth_token}
result = await self.connection.async_post("auth/confirmAuthToken", data, self.headers)
LOGGER.debug(f"Confirm auth token result: {result}")
return result.json["clientId"]
#
# class Auth(object):
# def __init__(self, home: Home):
# self.uuid = str(uuid.uuid4())
# self.headers = {
# "content-type": "application/json",
# "accept": "application/json",
# "VERSION": "12",
# "CLIENTAUTH": home._connection.clientauth_token,
# }
# self.url_rest = home._connection.urlREST
# self.pin = None
#
# def connectionRequest(
# self, access_point, devicename="homematicip-python"
# ) -> requests.Response:
# data = {"deviceId": self.uuid, "deviceName": devicename, "sgtin": access_point}
# headers = self.headers
# if self.pin != None:
# headers["PIN"] = self.pin
# response = requests.post(
# "{}/hmip/auth/connectionRequest".format(self.url_rest),
# json=data,
# headers=headers,
# )
# return response
#
# def isRequestAcknowledged(self):
# data = {"deviceId": self.uuid}
# response = requests.post(
# "{}/hmip/auth/isRequestAcknowledged".format(self.url_rest),
# json=data,
# headers=self.headers,
# )
# return response.status_code == 200
#
# def requestAuthToken(self):
# data = {"deviceId": self.uuid}
# response = requests.post(
# "{}/hmip/auth/requestAuthToken".format(self.url_rest),
# json=data,
# headers=self.headers,
# )
# return json.loads(response.text)["authToken"]
#
# def confirmAuthToken(self, authToken):
# data = {"deviceId": self.uuid, "authToken": authToken}
# response = requests.post(
# "{}/hmip/auth/confirmAuthToken".format(self.url_rest),
# json=data,
# headers=self.headers,
# )
# return json.loads(response.text)["clientId"]