Source code for OMMClient.OMMClient

from threading import Thread, Event, Lock
from OMUser import OMMUser
from OMDevice import OMMDevice
from time import sleep
from events import Events
from utils import *
from messagehelper import *
import socket
import ssl
import Queue


# noinspection PyMissingConstructor
[docs]class OMMClient(Events): """ """ _host = None _port = None _tcp_socket = None _ssl_socket = None _send_q = None _recv_q = None _worker = None _dispatcher = None _sequence = 0 _sequencelock = Lock() _terminate = False _events = {} _eventlock = Lock() _modulus = None _exponent = None _logged_in = False omm_status = {} omm_versions = {} __events__ = ('on_RFPState', 'on_HealthState', 'on_DECTSubscriptionMode', 'on_PPDevCnf') def __init__(self, host, port=12622): """ Initializes a new OMM Client using destination address and port Args: host (str): address of the server running OMM port (int): port the OMM service is listening """ self._host = host self._port = port self._tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._tcp_socket.settimeout(10) self._ssl_socket = ssl.wrap_socket(self._tcp_socket) self._send_q = Queue.Queue() self._recv_q = Queue.Queue() self._worker = Thread(target=self._work) self._worker.daemon = True self._dispatcher = Thread(target=self._dispatch) self._dispatcher.daemon = True def __getattr__(self, name): """ Check if new property and Named like Eventhandler Args: name: Returns: """ if name not in self.__dict__ and name in self.__events__: omm_event = name.split("_")[1] self.subscribe_event(omm_event) return Events.__getattr__(self, name) def _get_sequence(self): with self._sequencelock: sequence = self._sequence self._sequence += 1 return sequence def _awaitresponse(self, message): if message in self._events: raise Exception("Already waiting for "+message) e = Event() with self._eventlock: self._events[message] = {} self._events[message]["event"] = e self._events[message]["event"].wait() data = parse_message(self._events[message]["response"]) with self._eventlock: self._events.pop(message) return data def _sendrequest(self, message, messagedata=None, children=None): """ Args: message: messagedata: children: Returns: """ msg = construct_message(message, messagedata, children) self._send_q.put(msg) responsemssage = message+"Resp" if messagedata is not None and "seq" in messagedata: responsemssage += str(messagedata["seq"]) return self._awaitresponse(responsemssage)
[docs] def login(self, user, password, ommsync=False): """ login to OMM with given credentials Method used to login before executing any command except get_versions against the OMM. The login can be performed in two modes to support different sets of features. The normal OMPClient login is restricted to all operations which can be performed using the OMP client application. Using the OMM sync flag you will be able to perform further operations. These operations are: * Attach user profile to device (Login) * Detach user profile from device (Logout) * Write extended device information (read only in OMP) .. note:: Some operations in OMM-Sync mode might lead to destroy DECT paring. Args: user (str): Username to be used to login password (str): Password to be used to login ommsync (bool): If True login as OMM-Sync client """ messagedata = { "protocolVersion": "45", "username": user, "password": password } if ommsync is True: messagedata["UserDeviceSyncClient"] = "true" else: messagedata["OMPClient"] = "1" self._ssl_socket.connect((self._host, self._port)) self._worker.start() self._dispatcher.start() message, attributes, children = self._sendrequest("Open", messagedata) self._modulus = children["publicKey"]["modulus"] self._exponent = children["publicKey"]["exponent"] self.omm_status = attributes self.omm_versions = self.get_versions() self._logged_in = True
def _ensure_login(self): if not self._logged_in: raise Exception("OMMClient not logged in")
[docs] def get_account(self, uid): """ Get System Account Args: uid: """ self._ensure_login() message, attributes, children = self._sendrequest("GetAccount", {"id": uid}) return attributes
[docs] def subscribe_event(self, event): """ Subscribes to any event specified Args: event: """ self._ensure_login() self._sendrequest("Subscribe", {}, {"e": {"cmd": "On", "eventType": event}})
[docs] def get_sari(self): """ Fetches the configured SARI Returns: Configured SARI for the OMM system """ self._ensure_login() message, attributes, children = self._sendrequest("GetSARI") return attributes.get("sari")
[docs] def get_systemname(self): """ Fetches the OMM system name Returns: The OMMs configured system name. """ self._ensure_login() message, attributes, children = self._sendrequest("GetSystemName") return attributes.get("name")
[docs] def get_limits(self): """ Fetches maximum Numbers for RFPs, users ect. Returns: A dict containing all limitations for the different types. """ self._ensure_login() message, attributes, children = self._sendrequest("Limits") return attributes
[docs] def get_versions(self): """ Fetches the OMM supported protocol versions for all calls. Returns: A dict containing protocol versions for all available calls. """ message, attributes, children = self._sendrequest("GetVersions") return attributes
[docs] def set_subscription(self, mode, timeout=None): """ Set DECT Subscription Mode (Modes are: off, configured, wildcard) Args: mode (str): one of the following Modes: * off * configured * wildcard timeout (int): The time after that the wildcard mode disables. It is only required if switching to wildcard mode. Returns: True: if parameters are ok False: if the parameters are faulty """ modes = { "OFF": "Off", "WILDCARD": "Wildcard", "CONFIGURED": "Configured" } if mode is None or mode.upper() not in modes: return False messagedata = { "mode": modes[mode.upper()], } if mode is "Wildcard" and timeout is not None: return False else: messagedata["timeout"] = timeout self._sendrequest("SetDECTSubscriptionMode", messagedata) return True
[docs] def set_user_pin(self, uid, pin): """ rest a user profiles PIN Resets the PIN a user uses to login his user profile into a DECT device. That can be done by using the defined system feature code, the profiles login and this PIN. Args: uid (int): user profile id pin (str): PIN to set Returns: True: if successful False: if the request failed """ messagedata = { "user": { "uid": uid, "pin": encrypt_pin(pin, self._modulus, self._exponent) } } message, attributes, children = self._sendrequest("SetPPUser", {"seq": str(self._get_sequence())}, messagedata) if len(children) > 0 and children["user"] is not None: return True else: return False
[docs] def get_device(self, ppn): """ get device configuration data Args: :param ppn: device id inside OMM :type ppn: int Returns: :rtype: OMMDevice :returns: Device object if successful None if not """ message, attributes, children = self._sendrequest("GetPPDev", {"seq": self._get_sequence(), "ppn": ppn}) if children is not None and "pp" in children and children["pp"] is not None \ and children["pp"]["ppn"] == str(ppn): device = OMMDevice(self, children["pp"]) return device else: return None
[docs] def get_user(self, uid): """ get user configuration data Obtain the user profiles configuration data like sip-login ect. using the user id. Args: uid (int): user profile id Returns: Will return the users profile if the request ist successful. If it fails None will be returned. """ message, attributes, children = self._sendrequest("GetPPUser", {"seq": self._get_sequence(), "uid": uid}) if children is not None and "user" in children and children["user"] is not None \ and children["user"]["uid"] == str(uid): user = OMMUser(self, children["user"]) return user else: return None
[docs] def set_user_relation_dynamic(self, uid): """ Convert a fixed device-user relation into a dynamic one After converting the relation from fixed to dynamic users are able to logout the profile from a device using the DECT feature code. Args: uid (int): user profile id Returns: Will return the user profile's attributes if successful False will be returned if the request failed. """ messagedata = { "seq": self._get_sequence(), "uid": uid, "relType": "Dynamic" } message, attributes, children = self._sendrequest("SetPPUserDevRelation", messagedata) if attributes is not None: return attributes else: return False
[docs] def set_user_relation_fixed(self, uid): """ Convert a user-device relation into fixed type .. note:: Prior to this operation the user profile must be bound to a device. When a user profile is already logged in (bound) to a device using dynamic relationship this method can be used to fix the binding. After that no login and logout method can be performed using the DECT mechanisms. Args: uid (int): user profile id Returns: If successful it will return a dict containing all information about the user profile. Will return False if the request didn't succeed properly. """ messagedata = { "seq": self._get_sequence(), "uid": uid, "relType": "Fixed" } message, attributes, children = self._sendrequest("SetPPUserDevRelation", messagedata) if attributes is not None: return attributes else: return False
[docs] def detach_user_device(self, uid, ppn): """ detaches an user profile from an existing device .. note:: You have to obtain the device id also named ppn and the users id named uid. Can be used to logout a user profile from a device entry. The user can be logged in to another device after that. The device can be used to login another user. Args: uid (int): user profile id ppn (int): registered device id Returns: True if the operation was successful. False if it failed. """ if (type(uid) is not int or type(ppn) is not int) or (ppn <= 0 or uid <= 0): return False messagedata = { "pp": { "uid": 0, "relType": "Unbound", "ppn": ppn }, "user": { "uid": uid, "relType": "Unbound", "ppn": 0 } } message, attributes, children = self._sendrequest("SetPP", {"seq": self._get_sequence()}, messagedata) if children is not None and "pp" in children and children["pp"]["uid"] == str(uid): return True else: return False
[docs] def attach_user_device(self, uid, ppn): """ Connects an existing user profile to an existing subscribed device Args: uid (int): user profile id ppn (int): registered device id Returns: True if the operation was successful. False if it failed. """ if (type(uid) is not int or type(ppn) is not int) or (ppn <= 0 or uid <= 0): return False messagedata = { "pp": { "uid": uid, "relType": "Dynamic", "ppn": ppn }, "user": { "uid": uid, "relType": "Dynamic", "ppn": ppn } } message, attributes, children = self._sendrequest("SetPP", {"seq": self._get_sequence()}, messagedata) if children is not None and "pp" in children and children["pp"]["uid"] == str(uid): return True else: return False
[docs] def ping(self): """ Pings OMM and awaits response """ self._ensure_login() self._sendrequest("Ping", {})
[docs] def create_user(self, name, number, desc1=None, desc2=None, login=None, pin="", sip_user=None, sip_password=None): """ Creates new user This function will create a new user profile without a device relation ship in dynamic mode. It can be used to loing from a device using the feature access code with login and PIN specified. The Feature access code can be configured using OMM. Could be someting like (*1)(4711)(3333). Within the example *1 stands for general feature access prefix 4711 is the code for user login. And 3333 is the extension for which login is requested. User will be prompted for a PIN. .. note:: If no sip user name and sip password is specified number will be used :param name: Name for the user profile (Shown as Name in OMP) :type name: str :param number: number for the user profile (Shown as Number/SIP user name in OMM) :type number: str :param desc1: Description 1 for the new user profile. Can by any string. :type desc1: str :param desc2: Description 2 for the new user profile. Can by any string. :type desc2: str :param login: Login for the use to be used for profile login from DECT or additional ID. :type login: str :param pin: PIN for profile login via DECT. Any non numeric value doesn't make sense. :type pin: str :param sip_user: Username for OMM to register the profile against the configured sip registrar :type sip_user: str :param sip_password: Password for sip register against registrar configured :type sip_password: str :rtype: dict :return: Will return a dict containing data of the new user object if successful. Will return None if it failed. """ children = { "user": { "name": name, "num": number } } if desc1: children["user"]["hierarchy1"] = desc1 if desc2: children["user"]["hierarchy2"] = desc2 if login: children["user"]["addId"] = login if pin: children["user"]["pin"] = encrypt_pin(pin, self._modulus, self._exponent) if sip_user: children["user"]["sipAuthId"] = sip_user if sip_password: children["user"]["sipPw"] = encrypt_pin(sip_password, self._modulus, self._exponent) message, attributes, children = self._sendrequest("CreatePPUser", {"seq": self._get_sequence()}, children) if children is not None and "user" in children: return children["user"] else: return None
[docs] def delete_device(self, ppid): """ Delete a configured handset (pp) .. note:: This operation can not be undone! :param ppid: id of the PP to be deleted (>0) :type ppid: int :return: None """ self._ensure_login() self._sendrequest("DeletePPDev", {"ppn": str(ppid), "seq": str(self._get_sequence())})
[docs] def get_device_state(self, ppn): """ Fetches the current state of a PP Args: :param ppn: id of the PP to get the current state for :type ppn: int Returns: :return: A dict containing the devices state information """ self._ensure_login() message, attributes, children = self._sendrequest("GetPPState", {"ppn": str(ppn), "seq": str(self._get_sequence())}) if children is not None and "pp" in children and children["pp"] is not None \ and children["pp"]["ppn"] == str(ppn): device = OMMDevice(self, children["pp"]) return device else: return None
def _work(self): while not self._terminate: if not self._send_q.empty(): item = self._send_q.get(block=False) self._ssl_socket.send(item + chr(0)) self._send_q.task_done() self._ssl_socket.settimeout(0.1) data = None try: data = self._ssl_socket.recv(65536) except Exception, e: if e.message == "The read operation timed out": continue if data: self._recv_q.put(data) def _dispatch(self): while not self._terminate: sleep(0.1) if not self._recv_q.empty(): item = self._recv_q.get(block=False) message, attributes, children = parse_message(item) if message == "EventDECTSubscriptionMode": self.on_DECTSubscriptionMode(message, attributes, children) continue if "seq" in attributes: message += attributes["seq"] with self._eventlock: if message in self._events: self._events[message]["response"] = item self._events[message]["event"].set()
[docs] def logout(self): """ Logout from OMM Calling this method will log you out and close the underlying tcp/ssl socket. Login can be called any time to reuse the client object for further calls. """ self._logged_in = False self._terminate = True self._worker.join() self._dispatcher.join() self._ssl_socket.close()
def __del__(self): self.logout()