Source code for ricxappframe.xapp_frame

# ==================================================================================
#       Copyright (c) 2020 Nokia
#       Copyright (c) 2020 AT&T Intellectual Property.
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#          http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
# ==================================================================================
"""
Framework for python xapps
Framework here means Xapp classes that can be subclassed
"""

import queue
from threading import Thread
from ricxappframe import xapp_rmr
from ricxappframe.rmr import rmr
from ricxappframe.xapp_sdl import SDLWrapper
from mdclogpy import Logger

# constants
RIC_HEALTH_CHECK_REQ = 100
RIC_HEALTH_CHECK_RESP = 101


# Private base class; not for direct client use


class _BaseXapp:
    """
    Base xapp; not for client use directly
    """

    def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
        """
        Init

        Parameters
        ----------
        rmr_port: int
            port to listen on

        rmr_wait_for_ready: bool (optional)

            if this is True, then init waits until rmr is ready to send, which
            includes having a valid routing file. This can be set to
            False if the client only wants to *receive only*.

        use_fake_sdl: bool (optional)
            if this is True, it uses dbaas' "fake dict backend" instead
            of Redis or other backends. Set this to true when developing
            your xapp or during unit testing to completely avoid needing
            a dbaas running or any network at all.

        post_init: function (optional)
            runs this user provided function after the base xapp is
            initialized; its signature should be post_init(self)
        """
        # PUBLIC, can be used by xapps using self.(name):
        self.logger = Logger(name=__name__)

        # Start rmr rcv thread
        self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
        self._mrc = self._rmr_loop.mrc  # for convenience

        # SDL
        self._sdl = SDLWrapper(use_fake_sdl)

        # run the optionally provided user post init
        if post_init:
            post_init(self)

    # Public rmr methods

    def rmr_get_messages(self):
        """
        Returns a generator iterable over all items in the queue that
        have not yet been read by the client xapp. Each item is a tuple
        (S, sbuf) where S is a message summary dict and sbuf is the raw
        message. The caller MUST call rmr.rmr_free_msg(sbuf) when
        finished with each sbuf to prevent memory leaks!
        """
        while not self._rmr_loop.rcv_queue.empty():
            (summary, sbuf) = self._rmr_loop.rcv_queue.get()
            yield (summary, sbuf)

    def rmr_send(self, payload, mtype, retries=100):
        """
        Allocates a buffer, sets payload and mtype, and sends

        Parameters
        ----------
        payload: bytes
            payload to set
        mtype: int
            message type
        retries: int (optional)
            Number of times to retry at the application level before excepting RMRFailure

        Returns
        -------
        bool
            whether or not the send worked after retries attempts
        """
        sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)

        for _ in range(retries):
            sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
            if sbuf.contents.state == 0:
                self.rmr_free(sbuf)
                return True

        self.rmr_free(sbuf)
        return False

    def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
        """
        Allows the xapp to return to sender, possibly adjusting the
        payload and message type before doing so.  This does NOT free
        the sbuf for the caller as the caller may wish to perform
        multiple rts per buffer. The client needs to free.

        Parameters
        ----------
        sbuf: ctypes c_void_p
             Pointer to an rmr message buffer
        new_payload: bytes (optional)
            New payload to set
        new_mtype: int (optional)
            New message type (replaces the received message)
        retries: int (optional)
            Number of times to retry at the application level before
            throwing exception RMRFailure

        Returns
        -------
        bool
            whether or not the send worked after retries attempts
        """
        for _ in range(retries):
            sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
            if sbuf.contents.state == 0:
                return True

        self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
        return False

    def rmr_free(self, sbuf):
        """
        Frees an rmr message buffer after use

        Note: this does not need to be a class method, self is not
        used. However if we break it out as a function we need a home
        for it.

        Parameters
        ----------
        sbuf: ctypes c_void_p
             Pointer to an rmr message buffer
        """
        rmr.rmr_free_msg(sbuf)

    # SDL
    # NOTE, even though these are passthroughs, the separate SDL wrapper
    # is useful for other applications like A1. Therefore, we don't
    # embed that SDLWrapper functionality here so that it can be
    # instantiated on its own.

    def sdl_set(self, ns, key, value, usemsgpack=True):
        """
        Stores a key-value pair,
        optionally serializing the value to bytes using msgpack.

        Parameters
        ----------
        ns: string
            SDL namespace
        key: string
            SDL key
        value:
            Object or byte array to store.  See the `usemsgpack` parameter.
        usemsgpack: boolean (optional, default is True)
            Determines whether the value is serialized using msgpack before storing.
            If usemsgpack is True, the msgpack function `packb` is invoked
            on the value to yield a byte array that is then sent to SDL.
            Stated differently, if usemsgpack is True, the value can be anything
            that is serializable by msgpack.
            If usemsgpack is False, the value must be bytes.
        """
        self._sdl.set(ns, key, value, usemsgpack)

    def sdl_get(self, ns, key, usemsgpack=True):
        """
        Gets the value for the specified namespace and key,
        optionally deserializing stored bytes using msgpack.

        Parameters
        ----------
        ns: string
            SDL namespace
        key: string
            SDL key
        usemsgpack: boolean (optional, default is True)
            If usemsgpack is True, the byte array stored by SDL is deserialized
            using msgpack to yield the original object that was stored.
            If usemsgpack is False, the byte array stored by SDL is returned
            without further processing.

        Returns
        -------
        Value
            See the usemsgpack parameter for an explanation of the returned value type.
            Answers None if the key is not found.
        """
        return self._sdl.get(ns, key, usemsgpack)

    def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
        """
        Gets all key-value pairs in the specified namespace
        with keys that start with the specified prefix,
        optionally deserializing stored bytes using msgpack.

        Parameters
        ----------
        ns: string
           SDL namespace
        prefix: string
            the key prefix
        usemsgpack: boolean (optional, default is True)
            If usemsgpack is True, the byte array stored by SDL is deserialized
            using msgpack to yield the original value that was stored.
            If usemsgpack is False, the byte array stored by SDL is returned
            without further processing.

        Returns
        -------
        Dictionary of key-value pairs
            Each key has the specified prefix.
            The value object (its type) depends on the usemsgpack parameter,
            but is either a Python object or raw bytes as discussed above.
            Answers an empty dictionary if no keys matched the prefix.
        """
        return self._sdl.find_and_get(ns, prefix, usemsgpack)

    def sdl_delete(self, ns, key):
        """
        Deletes the key-value pair with the specified key in the specified namespace.

        Parameters
        ----------
        ns: string
           SDL namespace
        key: string
            SDL key
        """
        self._sdl.delete(ns, key)

    # Health

    def healthcheck(self):
        """
        this needs to be understood how this is supposed to work
        """
        return self._rmr_loop.healthcheck() and self._sdl.healthcheck()

    def stop(self):
        """
        cleans up and stops the xapp rmr thread (currently). This is
        critical for unit testing as pytest will never return if the
        thread is running.

        TODO: can we register a ctrl-c handler so this gets called on
        ctrl-c? Because currently two ctrl-c are needed to stop.
        """
        self._rmr_loop.stop()


# Public Classes to subclass (these subclass _BaseXapp)


[docs]class RMRXapp(_BaseXapp): """ Represents an Xapp that reacts only to RMR messages; i.e., when messages are received, the Xapp does something. When run is called, the xapp framework waits for RMR messages, and calls the appropriate client-registered consume callback on each. Parameters ---------- default_handler: function A function with the signature (summary, sbuf) to be called when a message type is received for which no other handler is registered. default_handler argument summary: dict The RMR message summary, a dict of key-value pairs default_handler argument sbuf: ctypes c_void_p Pointer to an RMR message buffer. The user must call free on this when done. rmr_port: integer (optional, default is 4562) Initialize RMR to listen on this port rmr_wait_for_ready: boolean (optional, default is True) Wait for RMR to signal ready before starting the dispatch loop use_fake_sdl: boolean (optional, default is False) Use an in-memory store instead of the real SDL service post_init: function (optional, default None) Run this function after the app initializes and before the dispatch loop starts; its signature should be post_init(self) """ def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None): """ Also see _BaseXapp """ # init base super().__init__( rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init ) # setup callbacks self._default_handler = default_handler self._dispatch = {} # used for thread control self._keep_going = True # register a default healthcheck handler # this default checks that rmr is working and SDL is working # the user can override this and register their own handler # if they wish since the "last registered callback wins". def handle_healthcheck(self, summary, sbuf): ok = self.healthcheck() payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n" self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP) self.rmr_free(sbuf) self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
[docs] def register_callback(self, handler, message_type): """ registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type Parameters ---------- handler: function a function with the signature (summary, sbuf) to be called when a message of type message_type is received summary: dict the rmr message summary sbuf: ctypes c_void_p Pointer to an rmr message buffer. The user must call free on this when done. message:type: int the message type to look for Note if this method is called multiple times for a single message type, the "last one wins". """ self._dispatch[message_type] = handler
[docs] def run(self, thread=False): """ This function should be called when the reactive Xapp is ready to start. After start, the Xapp's handlers will be called on received messages. Parameters ---------- thread: bool (optional, default is False) If False, execution is not returned and the framework loops forever. If True, a thread is started to run the queue read/dispatch loop and execution is returned to caller; the thread can be stopped by calling the .stop() method. """ def loop(): while self._keep_going: try: (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=5) # dispatch func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None) if not func: func = self._default_handler func(self, summary, sbuf) except queue.Empty: # the get timed out pass if thread: Thread(target=loop).start() else: loop()
[docs] def stop(self): """ Sets the flag to end the dispatch loop. """ super().stop() self.logger.debug("Setting flag to end framework work loop.") self._keep_going = False
[docs]class Xapp(_BaseXapp): """ Represents a generic Xapp where the client provides a function for the framework to call, which usually contains a loop-forever construct. Parameters ---------- entrypoint: function This function is called when the Xapp class's run method is invoked. The function signature must be just function(self) rmr_port: integer (optional, default is 4562) Initialize RMR to listen on this port rmr_wait_for_ready: boolean (optional, default is True) Wait for RMR to signal ready before starting the dispatch loop use_fake_sdl: boolean (optional, default is False) Use an in-memory store instead of the real SDL service """ def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False): """ Parameters ---------- For the other parameters, see class _BaseXapp. """ # init base super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl) self._entrypoint = entrypoint
[docs] def run(self): """ This function should be called when the general Xapp is ready to start. """ self._entrypoint(self)
# there is no need for stop currently here (base has, and nothing # special to do here)