telemetry-websocket.service.js 11.7 KB
/*
 * Copyright © 2016-2020 The Thingsboard Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
import 'angular-websocket';
import thingsboardTypes from '../common/types.constant';

export default angular.module('thingsboard.api.telemetryWebsocket', [thingsboardTypes])
    .factory('telemetryWebsocketService', TelemetryWebsocketService)
    .name;

const RECONNECT_INTERVAL = 2000;
const WS_IDLE_TIMEOUT = 90000;

const MAX_PUBLISH_COMMANDS = 10;

/*@ngInject*/
function TelemetryWebsocketService($rootScope, $websocket, $timeout, $window, $mdUtil, $log, toast, types, userService) {

    var isOpening = false,
        isOpened = false,
        isActive = false,
        isReconnect = false,
        reconnectSubscribers = [],
        lastCmdId = 0,
        subscribers = {},
        subscribersCount = 0,
        commands = {},
        cmdsWrapper = {
            tsSubCmds: [],
            historyCmds: [],
            attrSubCmds: []
        },
        telemetryUri,
        dataStream,
        location = $window.location,
        socketCloseTimer,
        reconnectTimer;

    var port = location.port;
    if (location.protocol === "https:") {
        if (!port) {
            port = "443";
        }
        telemetryUri = "wss:";
    } else {
        if (!port) {
            port = "80";
        }
        telemetryUri = "ws:";
    }
    telemetryUri += "//" + location.hostname + ":" + port;
    telemetryUri += "/api/ws/plugins/telemetry";

    var service = {
        subscribe: subscribe,
        unsubscribe: unsubscribe
    }

    $rootScope.telemetryWsLogoutHandle = $rootScope.$on('unauthenticated', function (event, doLogout) {
        if (doLogout) {
            reset(true);
        }
    });

    $rootScope.telemetryWsLoginHandle = $rootScope.$on('authenticated', function () {
        reset(true);
    });

    return service;

    function publishCommands () {
        while(isOpened && hasCommands()) {
            dataStream.send(preparePublishCommands()).then(function () {
                checkToClose();
            });
        }
        tryOpenSocket();
    }

    function hasCommands() {
        return cmdsWrapper.tsSubCmds.length > 0 ||
            cmdsWrapper.historyCmds.length > 0 ||
            cmdsWrapper.attrSubCmds.length > 0;
    }

    function preparePublishCommands() {
        var preparedWrapper = {};
        var leftCount = MAX_PUBLISH_COMMANDS;
        preparedWrapper.tsSubCmds = popCmds(cmdsWrapper.tsSubCmds, leftCount);
        leftCount -= preparedWrapper.tsSubCmds.length;
        preparedWrapper.historyCmds = popCmds(cmdsWrapper.historyCmds, leftCount);
        leftCount -= preparedWrapper.historyCmds.length;
        preparedWrapper.attrSubCmds = popCmds(cmdsWrapper.attrSubCmds, leftCount);
        return preparedWrapper;
    }

    function popCmds(cmds, leftCount) {
        var toPublish = Math.min(cmds.length, leftCount);
        if (toPublish > 0) {
            return cmds.splice(0, toPublish);
        } else {
            return [];
        }
    }

    function onError (errorEvent) {
        if (errorEvent) {
            //showWsError(0, errorEvent);
            $log.warn('WebSocket error event', errorEvent);
        }
        isOpening = false;
    }

    function onOpen () {
        isOpening = false;
        isOpened = true;
        if (reconnectTimer) {
            $timeout.cancel(reconnectTimer);
            reconnectTimer = null;
        }
        if (isReconnect) {
            isReconnect = false;
            for (var r=0; r<reconnectSubscribers.length;r++) {
                var reconnectSubscriber = reconnectSubscribers[r];
                if (reconnectSubscriber.onReconnected) {
                    reconnectSubscriber.onReconnected();
                }
                subscribe(reconnectSubscriber);
            }
            reconnectSubscribers = [];
        } else {
            publishCommands();
        }
    }

    function onClose (closeEvent) {
        if (closeEvent && closeEvent.code > 1000 && closeEvent.code !== 1006) {
            showWsError(closeEvent.code, closeEvent.reason);
        }
        isOpening = false;
        isOpened = false;
        if (isActive) {
            if (!isReconnect) {
                reconnectSubscribers = [];
                for (var id in subscribers) {
                    var subscriber = subscribers[id];
                    if (reconnectSubscribers.indexOf(subscriber) === -1) {
                        reconnectSubscribers.push(subscriber);
                    }
                }
                reset(false);
                isReconnect = true;
            }
            if (reconnectTimer) {
                $timeout.cancel(reconnectTimer);
            }
            reconnectTimer = $timeout(tryOpenSocket, RECONNECT_INTERVAL, false);
        }
    }

    function onMessage (message) {
        if (message.data) {
            var data = angular.fromJson(message.data);
            if (data.errorCode) {
                showWsError(data.errorCode, data.errorMsg);
            } else if (data.subscriptionId) {
                var subscriber = subscribers[data.subscriptionId];
                if (subscriber && data) {
                    var keys = fetchKeys(data.subscriptionId);
                    if (!data.data) {
                        data.data = {};
                    }
                    for (var k = 0; k < keys.length; k++) {
                        var key = keys[k];
                        if (!data.data[key]) {
                            data.data[key] = [];
                        }
                    }
                    subscriber.onData(data, data.subscriptionId);
                }
            }
        }
        checkToClose();
    }

    function showWsError(errorCode, errorMsg) {
        var message = 'WebSocket Error: ';
        if (errorMsg) {
            message += errorMsg;
        } else {
            message += "error code - " + errorCode + ".";
        }
        $mdUtil.nextTick(function () {
            toast.showError(message);
        });
    }

    function fetchKeys(subscriptionId) {
        var command = commands[subscriptionId];
        if (command && command.keys && command.keys.length > 0) {
            return command.keys.split(",");
        } else {
            return [];
        }
    }

    function nextCmdId () {
        lastCmdId++;
        return lastCmdId;
    }

    function subscribe (subscriber) {
        isActive = true;
        var cmdId;
        if (angular.isDefined(subscriber.subscriptionCommands)) {
            for (var i=0;i<subscriber.subscriptionCommands.length;i++) {
                var subscriptionCommand = subscriber.subscriptionCommands[i];
                cmdId = nextCmdId();
                subscribers[cmdId] = subscriber;
                subscriptionCommand.cmdId = cmdId;
                commands[cmdId] = subscriptionCommand;
                if (subscriber.type === types.dataKeyType.timeseries) {
                    cmdsWrapper.tsSubCmds.push(subscriptionCommand);
                } else if (subscriber.type === types.dataKeyType.attribute) {
                    cmdsWrapper.attrSubCmds.push(subscriptionCommand);
                }
            }
        }
        if (angular.isDefined(subscriber.historyCommands)) {
            for (i=0;i<subscriber.historyCommands.length;i++) {
                var historyCommand = subscriber.historyCommands[i];
                cmdId = nextCmdId();
                subscribers[cmdId] = subscriber;
                historyCommand.cmdId = cmdId;
                commands[cmdId] = historyCommand;
                cmdsWrapper.historyCmds.push(historyCommand);
            }
        }
        subscribersCount++;
        publishCommands();
    }

    function unsubscribe (subscriber) {
        if (isActive) {
            var cmdId = null;
            if (subscriber.subscriptionCommands) {
                for (var i=0;i<subscriber.subscriptionCommands.length;i++) {
                    var subscriptionCommand = subscriber.subscriptionCommands[i];
                    subscriptionCommand.unsubscribe = true;
                    if (subscriber.type === types.dataKeyType.timeseries) {
                        cmdsWrapper.tsSubCmds.push(subscriptionCommand);
                    } else if (subscriber.type === types.dataKeyType.attribute) {
                        cmdsWrapper.attrSubCmds.push(subscriptionCommand);
                    }
                    cmdId = subscriptionCommand.cmdId;
                    if (cmdId) {
                        if (subscribers[cmdId]) {
                            delete subscribers[cmdId];
                        }
                        if (commands[cmdId]) {
                            delete commands[cmdId];
                        }
                    }
                }
            }
            if (subscriber.historyCommands) {
                for (i=0;i<subscriber.historyCommands.length;i++) {
                    var historyCommand = subscriber.historyCommands[i];
                    cmdId = historyCommand.cmdId;
                    if (cmdId) {
                        if (subscribers[cmdId]) {
                            delete subscribers[cmdId];
                        }
                        if (commands[cmdId]) {
                            delete commands[cmdId];
                        }
                    }
                }
            }
            var index = reconnectSubscribers.indexOf(subscriber);
            if (index > -1) {
                reconnectSubscribers.splice(index, 1);
            }
            subscribersCount--;
            publishCommands();
        }
    }

    function checkToClose () {
        if (subscribersCount === 0 && isOpened) {
            if (!socketCloseTimer) {
                socketCloseTimer = $timeout(closeSocket, WS_IDLE_TIMEOUT, false);
            }
        }
    }

    function tryOpenSocket () {
        if (isActive) {
            if (!isOpened && !isOpening) {
                isOpening = true;
                if (userService.isJwtTokenValid()) {
                    openSocket(userService.getJwtToken());
                } else {
                    userService.refreshJwtToken().then(function success() {
                        openSocket(userService.getJwtToken());
                    }, function fail() {
                        isOpening = false;
                        $rootScope.$broadcast('unauthenticated');
                    });
                }
            }
            if (socketCloseTimer) {
                $timeout.cancel(socketCloseTimer);
                socketCloseTimer = null;
            }
        }
    }

    function openSocket(token) {
        dataStream = $websocket(telemetryUri + '?token=' + token);
        dataStream.onError(onError);
        dataStream.onOpen(onOpen);
        dataStream.onClose(onClose);
        dataStream.onMessage(onMessage, {autoApply: false});
    }

    function closeSocket() {
        isActive = false;
        if (isOpened) {
            dataStream.close();
        }
    }

    function reset(close) {
        if (socketCloseTimer) {
            $timeout.cancel(socketCloseTimer);
            socketCloseTimer = null;
        }
        lastCmdId = 0;
        subscribers = {};
        subscribersCount = 0;
        commands = {};
        cmdsWrapper.tsSubCmds = [];
        cmdsWrapper.historyCmds = [];
        cmdsWrapper.attrSubCmds = [];
        if (close) {
            closeSocket();
        }
    }
}