data-aggregator.js 9.24 KB
/*
 * Copyright © 2016-2017 The Thingsboard Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

export default class DataAggregator {

    constructor(onDataCb, tsKeyNames, startTs, limit, aggregationType, timeWindow, interval, types, $timeout, $filter) {
        this.onDataCb = onDataCb;
        this.tsKeyNames = tsKeyNames;
        this.startTs = startTs;
        this.aggregationType = aggregationType;
        this.types = types;
        this.$timeout = $timeout;
        this.$filter = $filter;
        this.dataReceived = false;
        this.resetPending = false;
        this.noAggregation = aggregationType === types.aggregation.none.value;
        this.limit = limit;
        this.timeWindow = timeWindow;
        this.interval = interval;
        this.aggregationTimeout = Math.max(this.interval, 1000);
        switch (aggregationType) {
            case types.aggregation.min.value:
                this.aggFunction = min;
                break;
            case types.aggregation.max.value:
                this.aggFunction = max;
                break;
            case types.aggregation.avg.value:
                this.aggFunction = avg;
                break;
            case types.aggregation.sum.value:
                this.aggFunction = sum;
                break;
            case types.aggregation.count.value:
                this.aggFunction = count;
                break;
            case types.aggregation.none.value:
                this.aggFunction = none;
                break;
            default:
                this.aggFunction = avg;
        }
    }

    reset(startTs, timeWindow, interval) {
        if (this.intervalTimeoutHandle) {
            this.$timeout.cancel(this.intervalTimeoutHandle);
            this.intervalTimeoutHandle = null;
        }
        this.intervalScheduledTime = currentTime();
        this.startTs = startTs;
        this.timeWindow = timeWindow;
        this.interval = interval;
        this.endTs = this.startTs + this.timeWindow;
        this.elapsed = 0;
        this.aggregationTimeout = Math.max(this.interval, 1000);
        this.resetPending = true;
        var self = this;
        this.intervalTimeoutHandle = this.$timeout(function() {
            self.onInterval();
        }, this.aggregationTimeout, false);
    }

    onData(data, update, history, apply) {
        if (!this.dataReceived || this.resetPending) {
            var updateIntervalScheduledTime = true;
            if (!this.dataReceived) {
                this.elapsed = 0;
                this.dataReceived = true;
                this.endTs = this.startTs + this.timeWindow;
            }
            if (this.resetPending) {
                this.resetPending = false;
                updateIntervalScheduledTime = false;
            }
            if (update) {
                this.aggregationMap = {};
                updateAggregatedData(this.aggregationMap, this.aggregationType === this.types.aggregation.count.value,
                    this.noAggregation, this.aggFunction, data.data, this.interval, this.startTs);
            } else {
                this.aggregationMap = processAggregatedData(data.data, this.aggregationType === this.types.aggregation.count.value, this.noAggregation);
            }
            if (updateIntervalScheduledTime) {
                this.intervalScheduledTime = currentTime();
            }
            this.onInterval(history, apply);
        } else {
            updateAggregatedData(this.aggregationMap, this.aggregationType === this.types.aggregation.count.value,
                this.noAggregation, this.aggFunction, data.data, this.interval, this.startTs);
            if (history) {
                this.intervalScheduledTime = currentTime();
                this.onInterval(history, apply);
            }
        }
    }

    onInterval(history, apply) {
        var now = currentTime();
        this.elapsed += now - this.intervalScheduledTime;
        this.intervalScheduledTime = now;
        if (this.intervalTimeoutHandle) {
            this.$timeout.cancel(this.intervalTimeoutHandle);
            this.intervalTimeoutHandle = null;
        }
        if (!history) {
            var delta = Math.floor(this.elapsed / this.interval);
            if (delta || !this.data) {
                this.startTs += delta * this.interval;
                this.endTs += delta * this.interval;
                this.data = toData(this.tsKeyNames, this.aggregationMap, this.startTs, this.endTs, this.$filter, this.limit);
                this.elapsed = this.elapsed - delta * this.interval;
            }
        } else {
            this.data = toData(this.tsKeyNames, this.aggregationMap, this.startTs, this.endTs, this.$filter, this.limit);
        }
        if (this.onDataCb) {
            this.onDataCb(this.data, this.startTs, this.endTs, apply);
        }

        var self = this;
        if (!history) {
            this.intervalTimeoutHandle = this.$timeout(function() {
                self.onInterval();
            }, this.aggregationTimeout, false);
        }
    }

    destroy() {
        if (this.intervalTimeoutHandle) {
            this.$timeout.cancel(this.intervalTimeoutHandle);
            this.intervalTimeoutHandle = null;
        }
        this.aggregationMap = null;
    }

}

/* eslint-disable */
function currentTime() {
    return window.performance && window.performance.now ?
        window.performance.now() : Date.now();
}
/* eslint-enable */

function processAggregatedData(data, isCount, noAggregation) {
    var aggregationMap = {};
    for (var key in data) {
        var aggKeyData = aggregationMap[key];
        if (!aggKeyData) {
            aggKeyData = {};
            aggregationMap[key] = aggKeyData;
        }
        var keyData = data[key];
        for (var i in keyData) {
            var kvPair = keyData[i];
            var timestamp = kvPair[0];
            var value = convertValue(kvPair[1], noAggregation);
            var aggKey = timestamp;
            var aggData = {
                count: isCount ? value : 1,
                sum: value,
                aggValue: value
            }
            aggKeyData[aggKey] = aggData;
        }
    }
    return aggregationMap;
}

function updateAggregatedData(aggregationMap, isCount, noAggregation, aggFunction, data, interval, startTs) {
    for (var key in data) {
        var aggKeyData = aggregationMap[key];
        if (!aggKeyData) {
            aggKeyData = {};
            aggregationMap[key] = aggKeyData;
        }
        var keyData = data[key];
        for (var i in keyData) {
            var kvPair = keyData[i];
            var timestamp = kvPair[0];
            var value = convertValue(kvPair[1], noAggregation);
            var aggTimestamp = noAggregation ? timestamp : (startTs + Math.floor((timestamp - startTs) / interval) * interval + interval/2);
            var aggData = aggKeyData[aggTimestamp];
            if (!aggData) {
                aggData = {
                    count: 1,
                    sum: value,
                    aggValue: isCount ? 1 : value
                }
                aggKeyData[aggTimestamp] = aggData;
            } else {
                aggFunction(aggData, value);
            }
        }
    }
}

function toData(tsKeyNames, aggregationMap, startTs, endTs, $filter, limit) {
    var data = {};
    for (var k in tsKeyNames) {
        data[tsKeyNames[k]] = [];
    }
    for (var key in aggregationMap) {
        var aggKeyData = aggregationMap[key];
        var keyData = data[key];
        for (var aggTimestamp in aggKeyData) {
            if (aggTimestamp <= startTs) {
                delete aggKeyData[aggTimestamp];
            } else if (aggTimestamp <= endTs) {
                var aggData = aggKeyData[aggTimestamp];
                var kvPair = [Number(aggTimestamp), aggData.aggValue];
                keyData.push(kvPair);
            }
        }
        keyData = $filter('orderBy')(keyData, '+this[0]');
        if (keyData.length > limit) {
            keyData = keyData.slice(keyData.length - limit);
        }
        data[key] = keyData;
    }
    return data;
}

function convertValue(value, noAggregation) {
    if (!noAggregation || value && isNumeric(value)) {
        return Number(value);
    } else {
        return value;
    }
}

function isNumeric(value) {
    return (value - parseFloat( value ) + 1) >= 0;
}

function avg(aggData, value) {
    aggData.count++;
    aggData.sum += value;
    aggData.aggValue = aggData.sum / aggData.count;
}

function min(aggData, value) {
    aggData.aggValue = Math.min(aggData.aggValue, value);
}

function max(aggData, value) {
    aggData.aggValue = Math.max(aggData.aggValue, value);
}

function sum(aggData, value) {
    aggData.aggValue = aggData.aggValue + value;
}

function count(aggData) {
    aggData.count++;
    aggData.aggValue = aggData.count;
}

function none(aggData, value) {
    aggData.aggValue = value;
}