Source code for homematicip.base.base_connection

import hashlib
import locale
import platform
import re
#
# ATTR_AUTH_TOKEN = "AUTHTOKEN"
# ATTR_CLIENT_AUTH = "CLIENTAUTH"


[docs] class HmipConnectionError(Exception): pass
[docs] class HmipWrongHttpStatusError(HmipConnectionError): def __init__(self, status_code=None): self.status_code = status_code def __str__(self): return f"HmipWrongHttpStatusError({self.status_code})"
[docs] class HmipServerCloseError(HmipConnectionError): pass
[docs] class HmipThrottlingError(HmipConnectionError): pass
# # class BaseConnection: # """Base connection class. # # Threaded and Async connection class must inherit from this.""" # # _auth_token = "" # _clientauth_token = "" # _urlREST = "" # _urlWebSocket = "" # # the homematic ip cloud tends to time out. retry the call X times. # _restCallRequestCounter = 3 # _restCallTimout = 6 # # def __init__(self): # self.headers = { # "content-type": "application/json", # "accept": "application/json", # "VERSION": "12", # ATTR_AUTH_TOKEN: None, # ATTR_CLIENT_AUTH: None, # } # lang = "en_US" # def_locale = locale.getlocale() # if def_locale != None and def_locale[0] != None: # lang = def_locale[0] # # self._clientCharacteristics = { # "clientCharacteristics": { # "apiVersion": "10", # "applicationIdentifier": "homematicip-python", # "applicationVersion": "1.0", # "deviceManufacturer": "none", # "deviceType": "Computer", # "language": lang, # "osType": platform.system(), # "osVersion": platform.release(), # }, # "id": None, # } # # @property # def clientCharacteristics(self): # return self._clientCharacteristics # # @property # def urlWebSocket(self): # return self._urlWebSocket # # @property # def urlREST(self): # return self._urlREST # # @property # def auth_token(self): # return self._auth_token # # @property # def clientauth_token(self): # return self._clientauth_token # # def set_token_and_characteristics(self, accesspoint_id): # accesspoint_id = re.sub(r"[^a-fA-F0-9 ]", "", accesspoint_id).upper() # self._clientCharacteristics["id"] = accesspoint_id # self._clientauth_token = ( # hashlib.sha512(str(accesspoint_id + "jiLpVitHvWnIGD1yo7MA").encode("utf-8")) # .hexdigest() # .upper() # ) # self.headers[ATTR_CLIENT_AUTH] = self._clientauth_token # # def set_auth_token(self, auth_token): # self._auth_token = auth_token # self.headers[ATTR_AUTH_TOKEN] = auth_token # # def init(self, accesspoint_id, lookup=True, **kwargs): # raise NotImplementedError # # def _rest_call(self, path, body=None): # raise NotImplementedError