uun_iot library

Subpackages

uun_iot.Gateway module

Main Gateway class which manages all user modules.

class uun_iot.Gateway.Config(gateway_instance, config_file)

Bases: uun_iot.typing.IModule

Configuration module.

The preffered styling of the JSON keys is camelCase, ie. first letter lowercase, letters after spaces uppercase and finally, the spaces removed.

The JSON configuration file has the following example structure:

{
    "gateway": {

        "moduleTimers": {
            "customModule1": {
                "receive": 120,
                "send": 400
            },
            "customModule2": 60
        },

        "moduleBackupStorage": {
            "customModule1": "backup/customModule1.json",
            "customModule2": {
                "path": "backup/customModule2.json",
                "limit": 50
            }
        },

        "customModule2": {
            "option1": "value1"
        }
    },

    "uuApp": { ... },
    "uuThing": { ... },
    "oidcGrantToken": { ... }
}

On the other hand, a minimal configuration file is an empty JSON file:

{}

Meaningful keys and subkeys are:

  • gateway: optional. Main configuration for this IoT application

    • moduleTimers: optional. Core functionality, dictionary with periods (in seconds) for @on("tick") events for corresponding modules. The module IDs are keys, the periods are values.

      • Multiple timers can be specified by introducing a subobject with the timer ID and the timer period. The timer ID corresponds to the event defined in Python code as @on("tick", "timerId").

    • moduleBackupStorage: optional. Applies to modules based on Module. The format is module ID as key and file path as value. This key is used to specify location to which should unsent data from the module be saved, see Module for more information.

      • You can specify additional information, such that the storage should be limited in size. For this, specify the size of the storage in number of entries in limit and add the original file path in the path key.

    • <moduleId>: optional. A default place for the configuration specific to the module with ID moduleId. The structure is arbitrary and depends on your needs.

  • keys for uun_iot.UuAppClient. Optional. See documentation there for more information. If you want to use secure communication with uuApp, specify the details in keys

    • uuApp

    • uuThing

    • oidcGrantToken

Parameters:
id: str = 'config'
on_tick()

Gets new configuration from the uuApp, validates it, restarts timers (if needed based on the new configuration) and saves the new configuration locally. It also notifies modules (via their @on("update")) about configuration update. It is triggered by a tick event.

register_callbacks(modules)

Register optional update event callbacks with Config.

class uun_iot.Gateway.Gateway(config_file, module_init=None)

Bases: contextlib.AbstractContextManager

Main Gateway class which manages all user modules.

Gateway is responsible for managing all modules and also manages event dispatch and configuration.

This class is a ContextManager. On enter, the start() method is called, on leave, the stop() method is called.

Warning

When an exception is thrown inside the module_init function (usually calling class constructors), it is not supressed and leads to classical exception behaviour (ie. program termination).

When an exception is thrown inside a on() event hook, the exception is catched and printed via logging.

Examples

from uun_iot import Gateway:
config_file="config.json"
with Gateway(config_file) as g:
    while True:
        time.sleep(1)

In this example, the Gateway is initialized using the configuration file in a with block. Upon entering, the gateway is started. When an exeption occurs, the gateway is gracefully stopped using its stop() method – this allows the Gateway to do cleanup and inform user modules (here are none) that they should exit, too.

Parameters:
config: dict

dictionary storing the gateway’s configuration see Config.config

runev: threading.Event

This Event is set when the Gateway start()-s and cleared when stop()-s.

signal_handler(sig, frame, additional_cleanup=None)

Handler for predefined signals.

The following signals are supported:

functionality no.

description

1

stop() Gateway, del ete all modules, run additional_cleanup and exit

signal

functionality

SIGINT

1

SIGTERM

1

SIGUSR1

1 and exit with error code 1

SIGUSR2

1

The signals need to be explicitly registered with this method being the associated handler. Register signals as:

import signal
from uun_iot import Gateway
with Gateway(...) as g:
    signal.signal(signal.SIGTERM, g.signal_handler)
    ...

If you want to specify additional_cleanup, define a partial function from this method with first two arguments left empty.

Parameters:
  • sig (signal.Signals) – caught signal

  • frame – exception frame

  • additional_cleanup (Callable[[uun_iot.Gateway.Gateway], None]) – optional function to be run to cleanup the Gateway. The function takes the Gateway instance as an argument.

start()

Invoke all @on("start") functions and start all timers.

This is no-op when the gateway is already started.

stop()

Invoke all @on("stop") functions and peacefully stop (wait to finish) all timers and associated @on("tick") handlers.

This is no-op when the gateway is already stopped.

stopev: threading.Event

This Event is set when the Gateway stop()-s and cleared when start()-s.

uuapp_client: uun_iot.UuAppClient.UuAppClient
class uun_iot.Gateway.ModuleTimer(module_instance, f, period)

Bases: object

Store IModule instance together with its tick event timer(s).

f and period are of type function and float respectively:

  • f: function to be called on each timer hit (module’s method)

  • period: period of the timer in seconds,

or f and period can be either of type dict and dict respectively, indexed by the same keys, the keys correspond to timer IDs:

  • f: values of the dictionary are the module_instance’s method

  • period: values are timer periods in seconds.

Parameters:
  • module_instance (uun_iot.typing.IModule) – an instantiated IModule (for which the timer is created)

  • f (Callable | Dict[str, Callable]) – method of module_instance object to be repeatedly called on tick event

  • period (float | int | Dict[str, float | int]) – the tick period in seconds

Raises:

ValueError – argument type mismatch for f and period, see above.

id: str

ID of passed module

m_instance: uun_iot.typing.IModule
multi: bool

does timer have multiple subtimers?

timer: uun_iot.utils.RepeatTimer | Dict[str, uun_iot.utils.RepeatTimer]

uun_iot.UuAppClient module

Communication with the *.Main uuSubApp.

class uun_iot.UuAppClient.UuAppClient(config, fn_token_request=<function post>, refresh_token_on_init=True, token_leeway=60)

Bases: object

Library functions for communication and authentication with uuApp.

The class gets a dictionary config containing the configuration of the application. The (root) keys and subkeys of interest to this class are

  • uuApp: information about server uuApp

    • gateway: domain gateway of the corresponding uuApp

    • uuAppName: full name of the uuApp

    • awid

    • uuCmdList: containing uuCmd endpoint. The keys are not used by UuAppClient class - they are used by the application user modules (most commonly specified in Python package path <package_name>/modules/__init__.py.

  • uuThing: authentication information about the IoT gateway

    • uuIdentity

    • accessCode1

    • accessCode2

  • oidcGrantToken: information about the authentication token service used in server communication

    • gateway: (usually uuidentity.plus4u.net) domain gateway of the OIDC service

    • uuAppName: (usually uu-oidc-maing02) full name of the OIDC uuApp

    • awid

    • clientId

    • clientSecret

    • uuCmd: (usually oidc/grantToken) uuCmd enpoint for token granting

    • tokenPath: (usually ./oidc-token) filesystem location (on the IoT gateway) where the token will be saved

Parameters:
  • config (dict) – dictionary with configuration, see above

  • fn_post_request – optional function to get a token response from server, defaults to requests.post()

  • refresh_token_on_init (bool) – call refresh_token() on initialization of this class

  • token_leeway (float) – leeway (drift) between Client and Server Clock [s], defaults to 60 s fresh token is requested when token is to be expired in less than token_leeway [s] the lower the leeway, the more often is token going to be requested the token validity time is controlled by the server, this option accounts only for drift

  • fn_token_request (Callable[[...], requests.models.Response]) –

custom_request(custom_send_f, dto_in=None, label=None, log_level=30)

Custom request wrapper for catching requests-related and token-acquiring-related Exceptions. Token errors, request exceptions, including HTTP non-success status codes, are suppressed, logged with specified level and returned.

Supply your own custom_send_f function (taking dto_in as only argument) and outputting a requests.Response. To make an authenticated request, you should use authentication headers available through get_auth_headers(). Note that you can specify the headers either directly during definition, or during evaluation of custom_send_f. The latter is preferred due to handling TokenErrors in a unified way with other exceptions. Note that token acquisition is not in scope of custom_request() and has to be supplied by caller.

See also get_uucmd_url() and get_auth_headers() for constructing own function.

Note

Example for custom_send_f follows.

uucmd = "gateway/heartbeat"
full_url = uuappclient.get_uucmd_url(uucmd)
def handler(dto_in):
    return requests.get(full_url, headers=self.get_auth_headers(), json=dto_in_, timeout=20)
uuappclient.custom_request(handler, {"key": "value"}, uucmd)
This function will
  • call custom_send_f with dto_in positional argument,

  • raise for non-success HTTP status codes

  • catch requests.HTTPError and requests.RequestException,

  • log the exceptions and

  • returns response along with the exception

Parameters:
  • custom_send_f (Callable[[object], requests.models.Response]) – custom send function which is used instead of requests.get() label: label used in log messages to identify the request

  • dto_in (object | None) – dictionary data to send. If None, pass {} to custom_send_f.

  • log_level (int) – optional logging level for possible exceptions, defaults to logging.WARNING

  • label (str | None) –

Return type:

Tuple[requests.models.Response | None, uun_iot.exceptions.TokenError | requests.exceptions.RequestException | None]

Returns: Tuple (response, exc),
  • requests.Response response, None if non-HTTP (not 200 HTTP status code) exception was raised

  • exc: requests.RequestException or TokenError if the same exception was raised during request. None if no exception during request was raised

Raises: None

get(uucmd, dto_in=None, log_level=30)

Get request using an authenticated Bearer token. Token errors, request exceptions, including HTTP non-success status codes, are suppressed, logged with specified level and returned. See custom_request() for more informaton.

Parameters:
  • uucmd (str) – UuCmd in relative path format, eg. gateway/heartbeat

  • dto_in (dict | None) – data to send

  • log_level (int) – optional logging level for possible exceptions, defaults to logging.WARNING

Returns:

Tuple (response, exc),
  • requests.Response response, None if non-HTTP (not 200 HTTP status code) exception was raised

  • requests.RequestException exc is None if no exception during request was raised

Return type:

Tuple[requests.models.Response | None, uun_iot.exceptions.TokenError | requests.exceptions.RequestException | None]

get_auth_headers(content_type='application/json')

Get request headers with Bearer authentication. The token is refreshed automatically, if expired.

Headers also includes specified Content-type and uun-iot’s User-Agent.

Parameters:

content_type (str) –

get_ignore_http_err(uucmd, dto_in=None, log_level=30)

GET request with authentication. Ignore HTTP errors, raise errors for other network errors. Logs all errors (including HTTP non-200) with log_level severity. See custom_request() for more informaton.

Parameters:
  • uucmd (str) – uuCmd path of the target uuApp

  • dto_in (dict | None) – data to be passed as JSON input to the uuApp

  • log_level (int) – logging library level of network error, default logging.WARNING

Return type:

requests.models.Response

Raises: requests.RequestException or TokenError

get_uucmd_url(uucmd)

Return fully quallified URL from relative uucmd string.

Parameters:

uucmd (str) – uucmd in relative path, eg. gateway/getWeather

Return type:

str

is_token_expired()

# view with respect to server-centered time # <-> leeway # ++++++==|—– # | # server_token_expire # # +: when (local) now is there, token is valid # =: when (local) now is there, token is valid but apply for a new token # because of the possible leeway between client and server clocks # -: when (local) now is there, token is invalid

Return type:

bool

multipart(uucmd, dto_in=None, log_level=30)

POST request with authentication and MULTIPART encoded data with oidc2 authentication. Useful for sending binary data (images, …). See https://toolbelt.readthedocs.io/en/latest/user.html#multipart-form-data-encoder for information about multipart encoder.

Pass data to dto_in, they will be transformed using MultipartEncoder and passed to UuAppClient.post() (see for more information and usage).

Parameters:
  • uucmd (str) –

  • dto_in (dict | None) –

  • log_level (int) –

Return type:

Tuple[requests.models.Response | None, uun_iot.exceptions.TokenError | requests.exceptions.RequestException | None]

post(uucmd, dto_in=None, log_level=30)

POST request using an authenticated Bearer token. Request exceptions, including HTTP non-success status codes, are suppressed, logged with specified level and returned. See custom_request() for more informaton.

Parameters:
  • uucmd (str) – UuCmd in relative path format, eg. gateway/heartbeat

  • dto_in (dict | requests_toolbelt.multipart.encoder.MultipartEncoder | None) – data to send, dictionary or MultipartEncoder data

  • log_level (int) – optional logging level for possible exceptions, defaults to logging.WARNING

Returns:

Tuple (response, exc),
  • requests.Response response, None if non-HTTP (not 200 HTTP status code) exception was raised

  • requests.RequestException exc is None if no exception during request was raised

Return type:

Tuple[requests.models.Response | None, uun_iot.exceptions.TokenError | requests.exceptions.RequestException | None]

Raises: ValueError if dto_in type is not dict or MultipartEncoder

post_ignore_http_err(uucmd, dto_in=None, log_level=30)

POST request with authentication. Ignore HTTP errors, raise errors for other network errors. Logs all errors (including HTTP non-200) with log_level severity.

Parameters:
  • uucmd (str) – uuCmd path of the target uuApp

  • dto_in (dict | requests_toolbelt.multipart.encoder.MultipartEncoder | None) – data to be passed as JSON input to the uuApp

  • log_level (int) – logging library level of network error, default logging.WARNING

Return type:

requests.models.Response

Raises: requests.RequestException or TokenError

refresh_token()

Refresh token if expired. If authentication is disabled, noops and returns False.

Returns False if token refresh was not needed - that is, if token is already loaded and valid. Refresh token and return True otherwise.

Raises:
  • TokenError – when a valid token could not be obtained due to network or server error

  • TokenCommandError (subclass of TokenError) – server-side uuApp returned error

Return type:

bool

class uun_iot.UuAppClient.UuCmdSession(uuclient, uucmd, log_level=30)

Bases: object

Send all data with UuCmd in one session to avoid creating multiple connections.

Parameters:
  • uuclient (uun_iot.UuAppClient.UuAppClient) – initialized authentication client

  • uucmd (str) – UuCmd in relative path format, eg. gateway/getWeather

  • log_level (int) – logging library log level to use for network and HTTP errors, defaults to logging.WARNING

get(data=None, log_level=30)

Authenticated GET request with session. See UuAppClient.get() for more information.

Parameters:
  • data (dict | None) –

  • log_level (int) –

Return type:

Tuple[requests.models.Response | None, uun_iot.exceptions.TokenError | requests.exceptions.RequestException | None]

post(data=None, log_level=30)

Authenticated POST request with session. See UuAppClient.post() for more information.

Parameters:
  • data (dict | None) –

  • log_level (int) –

Return type:

Tuple[requests.models.Response | None, uun_iot.exceptions.TokenError | requests.exceptions.RequestException | None]

update_headers()

Update session auth headers ensured to containing a valid Bearer token.

uun_iot.decorators module

Event handler registration using @on decorator.

uun_iot.decorators.on(*outer_args)

Decorator for event handler registration.

Synopsis: on(event[,id])

Supported events: tick, update, start, stop.

event

description

update

gateway’s configuration was updated

tick

timer tick (configured in JSON)

start

Gateway just started (using start())

stop

Gateway is stopping (using stop()), end module’s action as soon as possible

  • tick event can take another argument to specify the timer id (see gateway.moduleTimers key in configuration JSON), ie. on(fn), or on(fn, "timerId") with fn() being the handler function.

Handlers registered for the corresponding event with this decorator will be called on corresponding event by Gateway or Config objects. Passed arguments are different for each event and the methods have to take these arguments. Note: self denotes method’s module instance, origin indicates which module initiates the event

event

handler synopsis

origin

update

handler(self)

Config

tick

handler(self)

Gateway

start

handler(self, evs)

Gateway

stop

handler(self)

Gateway

where evs = (g.runev, g.stopev) is a tuple of threading.Event attributes runev and stopev. Here, g is the corresponding Gateway instance.

Note

In a typical use case, @on decorators are invoked on method/class definition, not at run-time. This can be seen on examples below.

Examples

  • timer event without ID

    • configuration:

      {
          "gateway": {
              "moduleTimers": {
                  "timerModule": 1
              }
          }
      }
      
    from uun_iot import on, Module
    class TimerModule(Module):
        @on("tick")
        def periodical(self):
            print("Tick tock every 1 s.")
    
  • timer event with ID

    • configuration

      {
          "gateway": {
              "moduleTimers": {
                  "sendReceive": {
                      "send": 2,
                      "get": 1
                  }
              }
          }
      }
      
    class SendReceive(Module):
        @on("tick", "get")
        def get(self):
            print(f"Retrieving data...")
    
        @on("tick", "send")
        def send(self):
            print(f"Sending data...")
    
  • start event

    • configuration:

      {
          "gateway": {}
      }
      
    class AdvancedDataMeasurement(Module):
        @on("start")
        def oneshot(self, evs):
            runev, stopev = evs
            while runev.is_set():
                print("Polling for voltage reading from voltmeter...")
                data = 53.8
                if data > 50:
                    time.sleep(1)
                else:
                    time.sleep(1.5)
                    print(data)
    

Warning

Dev note TODO. Is method unbounding needed? Why is the decorated method being unbound here in the first place?

Parameters:
  • event (str) – one of tick, update, start, stop

  • id (str) – optional, ID of the corresponding event, when more are specified in configuration JSON

uun_iot.diagnostic module

class uun_iot.diagnostic.DiagnosticEvent(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: uun_iot.diagnostic.GeneralDiagnosticEvent

General diagnostic events.

DATA_RECEIVED = 7

Module received valid data.

DATA_SEND_ATTEMPTED = 12

Module could not send any data due to an error.

DATA_SEND_FAIL = 11

Module could not send any data due to an error.

DATA_SEND_IMMINENT = 8

Module is going to send data. Use directly in front of self._send_storage() call.

DATA_SEND_OK = 9

Module has sent all data successfully.

DATA_SEND_PARTIAL = 10

Module has sent some data successfully, but some data were not sent.

DIAGNOSTICS_START = 1

Start of diagnostics.

DIAGNOSTICS_STOP = 2

Stop of diagnostics.

ON_START_CALL = 4

Registered @on(start) method is called.

ON_STOP_CALL = 5

Registered @on(stop) method is called.

ON_UPDATE_CALL = 6

Registered @on(update) method is called.

TIMER_CALL = 3

Registered @on(tick) method is called. Specify timer ID in custom argument for better debugging purposes.

class uun_iot.diagnostic.GatewayDiagnosticEvent(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: uun_iot.diagnostic.GeneralDiagnosticEvent

Events tied directly to the Gateway instance.

GATEWAY_SIGNAL = 3
GATEWAY_START = 1

Gateway was stopped.

GATEWAY_STOP = 2

Gateway received a signal. Specify signal type in custom argument for better debugging purposes.

class uun_iot.diagnostic.GeneralDiagnosticEvent(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: enum.IntEnum

Base class for diagnostic events specified by uun-iot library or user applications. Diagnostic events are used to notify HealthCheck about some important events in via HealthCheck.notify. If user application needs more diagnostic events, inherit from this class.

uun_iot.typing module

Types and interfaces frequently used in uun-iot library and apps.

class uun_iot.typing.IModule

Bases: abc.ABC

Module interface. All user modules must inherit from this interface.

abstract property id: str

uun_iot.utils module

class uun_iot.utils.LoggingSystemdHandler(stream=None)

Bases: logging.StreamHandler

Severity information for stdout&stderr logging. See SD-DAEMON(3)

PREFIX = {0: '<7>', 10: '<7>', 20: '<6>', 30: '<4>', 40: '<3>', 50: '<2>'}
emit(record)

Emit a record.

If a formatter is specified, it is used to format the record. The record is then written to the stream with a trailing newline. If exception information is present, it is formatted using traceback.print_exception and appended to the stream. If the stream has an ‘encoding’ attribute, it is used to determine how to do the output to the stream.

class uun_iot.utils.RepeatTimer(period, f, args=[], kwargs={}, runonstart=True)

Bases: object

Periodically run function with unlimited repetition.

Start the timer by executing start(). The timer can be interrupted at any time by issuing stop().

It is possible to stop() the timer, directly set different period or runonstart and start() again. But be sure to STOP before changing these values. Any exceptions raised in f will be logged with ERROR log level using logging library together with stacktrace. This will NOT stop the timer.

Warning

Any exception occuring in the function f is catched and logged, ie. exceptions occuring in the function will not stop the timer.

Parameters:
  • period (float | int) – period between timer ticks, in seconds

  • f (Callable) – function to be executed

  • args (List) – positional arguments to be passed to f

  • kwargs (Dict) – keyword arguments to be passed to f

  • runonstart (bool) – default to True – make the first execution of f right after calling start() (without waiting first for period)

property period: float | int
property runonstart: bool
start()

Start the timer.

stop(block=True)

Stop the timer.

Can be set to block until current execution of f is finished.

Parameters:

block (bool) – If block is True, block until currently executed function is finished. Default to True.

class uun_iot.utils.Storage(storage_id, backup_path, maxlen=0)

Bases: object

Storage providing thread-safe utility functionality.

This class provides:

  • basic thread-safe storage centered around Python’s list

  • backup and restore the storage to JSON files

  • limiting the storage to a number of entries. If enabled, the oldest entries are discarded. Newest entries are the ones stored at the end of the list.

    • as the limited storages are typically small in size (~100), no optimization is done for the sake of code simplicity. The list’s removal from the left is O(n) where n is the size of the storage. If you need bigger limited storages, consider double ended queue deque.

Parameters:
  • storage_id (str) – identifier used only for logging purposes

  • backup_path (str | None) – location of backup JSON file. Pass None to turn off file backups. It will automatically create empty directories and create the backup file, if the path does not exist. If it exists, load data into storage from file.

  • maxlen (int) – maximum number of entries in the storage. Oldest entries are replaced by newer ones. Pass 0 to make the storage unlimited.

append(data, backup=False)

Append a piece of new data to the storage list. Thread-safe.

There is an option to backup the storage directly after the append. Default is to not back up to file, because of the unnecessary IO overhead.

Parameters:
  • data (Any) – data to be appended using self.data.append()

  • backup (bool) – backup the storage to the file after appending. Defaults to False.

Return type:

None

property data: List

The stored data list.

empty()

Empty the storage and empty the backup file.

Return type:

None

is_empty()

Is the storage an empty list []?

Return type:

bool

merge(data, new_data=True, backup=True)

Merge another list with the storage. Thread-safe.

Chronologically merge data with storage. Defaults to file backup after the operation, if enabled in constructor.

Parameters:
  • data (List) – another list to merge with the data.

  • new_data (bool) – Value True means data are newer than whole content of the storage and are stored at the end of the storage. This is assumed by default. Value False merges the data at the beginning of the storage. This is important when storage limit is imposed, as the oldest entries will be deleted when the total number of entries is greater that the limit.

  • backup (bool) – backup the storage to the file after appending. Defaults to True.

Return type:

None

trim()

Trim the storage size to the limit limit.

Only limit last entries of the storage are kept, rest is deleted.

Do not trim if the limit is not set (is zero).

Return type:

None

write_lock: _thread.allocate_lock

writing lock threading.Lock can be used for more complex operations outside this class and is public

uun_iot.utils.file_save(path, data)

Save data to file in JSON.

Parameters:
  • path (str) – file path

  • data (List | Dict) – dictionary or list with data

Returns:

True if saved, False if file already contains the same data as passed

Return type:

bool

uun_iot.utils.get_datetime_from_iso_timestamp(iso_timestamp)

Create datetime object from given ISO timetamp .

Parameters:

iso_timestamp (str) – ISO timestamp

Returns:

corresponding datetime.datetime object

Return type:

datetime

uun_iot.utils.get_iso_timestamp(dt)

Create ISO timetamp from given datetime object.

Parameters:

dt (datetime.datetime) – datetime.datetime object

Returns:

corresponding ISO timestamp

Return type:

str

uun_iot.utils.module_id(module)

Create module ID from module itself

Call module_id_from_str() on name of given module.

Parameters:

module – instance of module

Returns:

module ID is created from module.__name__ attribute

Return type:

str

uun_iot.utils.module_id_from_str(module_name)

Create module ID from module name string.

The format is: first letter lowercase.

Examples

  • Hearbeat -> heartbeat

  • WeatherConditions -> weatherConditions

Parameters:

module_name (str) – module name

Returns:

formatted module ID

Return type:

str

Module contents