Commit 22d8ce7189f8a32cfd69de2df1d250a8a0cc67f2

Authored by Igor Kulikov
1 parent 9a937379

Implement telemetry websocket service

... ... @@ -2,5 +2,9 @@
2 2 "/api": {
3 3 "target": "http://localhost:8080",
4 4 "secure": false
  5 + },
  6 + "/api/ws": {
  7 + "target": "ws://localhost:8080",
  8 + "ws": true
5 9 }
6 10 }
... ...
  1 +///
  2 +/// Copyright © 2016-2019 The Thingsboard Authors
  3 +///
  4 +/// Licensed under the Apache License, Version 2.0 (the "License");
  5 +/// you may not use this file except in compliance with the License.
  6 +/// You may obtain a copy of the License at
  7 +///
  8 +/// http://www.apache.org/licenses/LICENSE-2.0
  9 +///
  10 +/// Unless required by applicable law or agreed to in writing, software
  11 +/// distributed under the License is distributed on an "AS IS" BASIS,
  12 +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 +/// See the License for the specific language governing permissions and
  14 +/// limitations under the License.
  15 +///
  16 +
  17 +import { Inject, Injectable } from '@angular/core';
  18 +import {
  19 + AttributesSubscriptionCmd,
  20 + GetHistoryCmd,
  21 + SubscriptionCmd,
  22 + SubscriptionUpdate,
  23 + SubscriptionUpdateMsg,
  24 + TelemetryFeature,
  25 + TelemetryPluginCmdsWrapper,
  26 + TelemetryService,
  27 + TelemetrySubscriber,
  28 + TimeseriesSubscriptionCmd
  29 +} from '@app/shared/models/telemetry/telemetry.models';
  30 +import { select, Store } from '@ngrx/store';
  31 +import { AppState } from '@core/core.state';
  32 +import { AuthService } from '@core/auth/auth.service';
  33 +import { selectIsAuthenticated } from '@core/auth/auth.selectors';
  34 +import { WINDOW } from '@core/services/window.service';
  35 +import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
  36 +import { ActionNotificationShow } from '@core/notification/notification.actions';
  37 +import Timeout = NodeJS.Timeout;
  38 +
  39 +const RECONNECT_INTERVAL = 2000;
  40 +const WS_IDLE_TIMEOUT = 90000;
  41 +const MAX_PUBLISH_COMMANDS = 10;
  42 +
  43 +@Injectable({
  44 + providedIn: 'root'
  45 +})
  46 +export class TelemetryWebsocketService implements TelemetryService {
  47 +
  48 + isActive = false;
  49 + isOpening = false;
  50 + isOpened = false;
  51 + isReconnect = false;
  52 +
  53 + socketCloseTimer: Timeout;
  54 + reconnectTimer: Timeout;
  55 +
  56 + lastCmdId = 0;
  57 + subscribersCount = 0;
  58 + subscribersMap = new Map<number, TelemetrySubscriber>();
  59 +
  60 + reconnectSubscribers = new Set<TelemetrySubscriber>();
  61 +
  62 + cmdsWrapper = new TelemetryPluginCmdsWrapper();
  63 + telemetryUri: string;
  64 +
  65 + dataStream: WebSocketSubject<TelemetryPluginCmdsWrapper | SubscriptionUpdateMsg>;
  66 +
  67 + constructor(private store: Store<AppState>,
  68 + private authService: AuthService,
  69 + @Inject(WINDOW) private window: Window) {
  70 + this.store.pipe(select(selectIsAuthenticated)).subscribe(
  71 + (authenticated: boolean) => {
  72 + if (!authenticated) {
  73 + this.reset(true);
  74 + }
  75 + }
  76 + );
  77 +
  78 + let port = this.window.location.port;
  79 + if (this.window.location.protocol === 'https:') {
  80 + if (!port) {
  81 + port = '443';
  82 + }
  83 + this.telemetryUri = 'wss:';
  84 + } else {
  85 + if (!port) {
  86 + port = '80';
  87 + }
  88 + this.telemetryUri = 'ws:';
  89 + }
  90 + this.telemetryUri += `//${this.window.location.hostname}:${port}/api/ws/plugins/telemetry`;
  91 + }
  92 +
  93 + public subscribe(subscriber: TelemetrySubscriber) {
  94 + this.isActive = true;
  95 + subscriber.subscriptionCommands.forEach(
  96 + (subscriptionCommand) => {
  97 + const cmdId = this.nextCmdId();
  98 + this.subscribersMap.set(cmdId, subscriber);
  99 + subscriptionCommand.cmdId = cmdId;
  100 + if (subscriptionCommand instanceof SubscriptionCmd) {
  101 + if (subscriptionCommand.getType() === TelemetryFeature.TIMESERIES) {
  102 + this.cmdsWrapper.tsSubCmds.push(subscriptionCommand as TimeseriesSubscriptionCmd);
  103 + } else {
  104 + this.cmdsWrapper.attrSubCmds.push(subscriptionCommand as AttributesSubscriptionCmd);
  105 + }
  106 + } else if (subscriptionCommand instanceof GetHistoryCmd) {
  107 + this.cmdsWrapper.historyCmds.push(subscriptionCommand);
  108 + }
  109 + }
  110 + );
  111 + this.subscribersCount++;
  112 + this.publishCommands();
  113 + }
  114 +
  115 + public unsubscribe(subscriber: TelemetrySubscriber) {
  116 + if (this.isActive) {
  117 + subscriber.subscriptionCommands.forEach(
  118 + (subscriptionCommand) => {
  119 + if (subscriptionCommand instanceof SubscriptionCmd) {
  120 + subscriptionCommand.unsubscribe = true;
  121 + if (subscriptionCommand.getType() === TelemetryFeature.TIMESERIES) {
  122 + this.cmdsWrapper.tsSubCmds.push(subscriptionCommand as TimeseriesSubscriptionCmd);
  123 + } else {
  124 + this.cmdsWrapper.attrSubCmds.push(subscriptionCommand as AttributesSubscriptionCmd);
  125 + }
  126 + }
  127 + const cmdId = subscriptionCommand.cmdId;
  128 + if (cmdId) {
  129 + this.subscribersMap.delete(cmdId);
  130 + }
  131 + }
  132 + );
  133 + this.reconnectSubscribers.delete(subscriber);
  134 + this.subscribersCount--;
  135 + this.publishCommands();
  136 + }
  137 + }
  138 +
  139 + private nextCmdId(): number {
  140 + this.lastCmdId++;
  141 + return this.lastCmdId;
  142 + }
  143 +
  144 + private publishCommands() {
  145 + while (this.isOpened && this.cmdsWrapper.hasCommands()) {
  146 + this.dataStream.next(this.cmdsWrapper.preparePublishCommands(MAX_PUBLISH_COMMANDS));
  147 + this.checkToClose();
  148 + }
  149 + this.tryOpenSocket();
  150 + }
  151 +
  152 + private checkToClose() {
  153 + if (this.subscribersCount === 0 && this.isOpened) {
  154 + if (!this.socketCloseTimer) {
  155 + this.socketCloseTimer = setTimeout(
  156 + () => this.closeSocket(), WS_IDLE_TIMEOUT);
  157 + }
  158 + }
  159 + }
  160 +
  161 + private reset(close: boolean) {
  162 + if (this.socketCloseTimer) {
  163 + clearTimeout(this.socketCloseTimer);
  164 + this.socketCloseTimer = null;
  165 + }
  166 + this.lastCmdId = 0;
  167 + this.subscribersMap.clear();
  168 + this.subscribersCount = 0;
  169 + this.cmdsWrapper.clear();
  170 + if (close) {
  171 + this.closeSocket();
  172 + }
  173 + }
  174 +
  175 + private closeSocket() {
  176 + this.isActive = false;
  177 + if (this.isOpened) {
  178 + this.dataStream.unsubscribe();
  179 + }
  180 + }
  181 +
  182 + private tryOpenSocket() {
  183 + if (this.isActive) {
  184 + if (!this.isOpened && !this.isOpening) {
  185 + this.isOpening = true;
  186 + if (AuthService.isJwtTokenValid()) {
  187 + this.openSocket(AuthService.getJwtToken());
  188 + } else {
  189 + this.authService.refreshJwtToken().subscribe(() => {
  190 + this.openSocket(AuthService.getJwtToken());
  191 + },
  192 + () => {
  193 + this.isOpening = false;
  194 + this.authService.logout(true);
  195 + }
  196 + );
  197 + }
  198 + }
  199 + if (this.socketCloseTimer) {
  200 + clearTimeout(this.socketCloseTimer);
  201 + this.socketCloseTimer = null;
  202 + }
  203 + }
  204 + }
  205 +
  206 + private openSocket(token: string) {
  207 + const uri = `${this.telemetryUri}?token=${token}`;
  208 + this.dataStream = webSocket(
  209 + {
  210 + url: uri,
  211 + openObserver: {
  212 + next: (e: Event) => {
  213 + this.onOpen();
  214 + }
  215 + },
  216 + closeObserver: {
  217 + next: (e: CloseEvent) => {
  218 + this.onClose(e);
  219 + }
  220 + }
  221 + }
  222 + );
  223 +
  224 + this.dataStream.subscribe((message) => {
  225 + this.onMessage(message as SubscriptionUpdateMsg);
  226 + },
  227 + (error) => {
  228 + this.onError(error);
  229 + });
  230 + }
  231 +
  232 + private onOpen() {
  233 + this.isOpening = false;
  234 + this.isOpened = true;
  235 + if (this.reconnectTimer) {
  236 + clearTimeout(this.reconnectTimer);
  237 + this.reconnectTimer = null;
  238 + }
  239 + if (this.isReconnect) {
  240 + this.isReconnect = false;
  241 + this.reconnectSubscribers.forEach(
  242 + (reconnectSubscriber) => {
  243 + reconnectSubscriber.onReconnected();
  244 + this.subscribe(reconnectSubscriber);
  245 + }
  246 + );
  247 + this.reconnectSubscribers.clear();
  248 + } else {
  249 + this.publishCommands();
  250 + }
  251 + }
  252 +
  253 + private onMessage(message: SubscriptionUpdateMsg) {
  254 + if (message.errorCode) {
  255 + this.showWsError(message.errorCode, message.errorMsg);
  256 + } else if (message.subscriptionId) {
  257 + const subscriber = this.subscribersMap.get(message.subscriptionId);
  258 + if (subscriber) {
  259 + subscriber.onData(new SubscriptionUpdate(message));
  260 + }
  261 + }
  262 + this.checkToClose();
  263 + }
  264 +
  265 + private onError(errorEvent) {
  266 + if (errorEvent) {
  267 + console.warn('WebSocket error event', errorEvent);
  268 + }
  269 + this.isOpening = false;
  270 + }
  271 +
  272 + private onClose(closeEvent: CloseEvent) {
  273 + if (closeEvent && closeEvent.code > 1000 && closeEvent.code !== 1006) {
  274 + this.showWsError(closeEvent.code, closeEvent.reason);
  275 + }
  276 + this.isOpening = false;
  277 + this.isOpened = false;
  278 + if (this.isActive) {
  279 + if (!this.isReconnect) {
  280 + this.reconnectSubscribers.clear();
  281 + this.subscribersMap.forEach(
  282 + (subscriber) => {
  283 + this.reconnectSubscribers.add(subscriber);
  284 + }
  285 + );
  286 + this.reset(false);
  287 + this.isReconnect = true;
  288 + }
  289 + if (this.reconnectTimer) {
  290 + clearTimeout(this.reconnectTimer);
  291 + }
  292 + this.reconnectTimer = setTimeout(() => this.tryOpenSocket(), RECONNECT_INTERVAL);
  293 + }
  294 + }
  295 +
  296 + private showWsError(errorCode: number, errorMsg: string) {
  297 + let message = 'WebSocket Error: ';
  298 + if (errorMsg) {
  299 + message += errorMsg;
  300 + } else {
  301 + message += `error code - ${errorCode}.`;
  302 + }
  303 + this.store.dispatch(new ActionNotificationShow(
  304 + {
  305 + message, type: 'error'
  306 + }));
  307 + }
  308 +
  309 +}
... ...
... ... @@ -85,7 +85,8 @@
85 85 {count: dataSource.selection.selected.length}) | async }}
86 86 </span>
87 87 <span fxFlex></span>
88   - <button mat-button mat-icon-button [disabled]="isLoading$ | async"
  88 + <button [fxShow]="!isClientSideTelemetryTypeMap.get(attributeScope)"
  89 + mat-button mat-icon-button [disabled]="isLoading$ | async"
89 90 matTooltip="{{ 'action.delete' | translate }}"
90 91 matTooltipPosition="above"
91 92 (click)="deleteAttributes($event)">
... ...
... ... @@ -61,6 +61,7 @@ import {
61 61 EditAttributeValuePanelData
62 62 } from './edit-attribute-value-panel.component';
63 63 import { ComponentPortal, PortalInjector } from '@angular/cdk/portal';
  64 +import { TelemetryWebsocketService } from '@core/ws/telemetry-websocket.service';
64 65
65 66
66 67 @Component({
... ... @@ -137,6 +138,7 @@ export class AttributeTableComponent extends PageComponent implements AfterViewI
137 138
138 139 constructor(protected store: Store<AppState>,
139 140 private attributeService: AttributeService,
  141 + private telemetryWsService: TelemetryWebsocketService,
140 142 public translate: TranslateService,
141 143 public dialog: MatDialog,
142 144 private overlay: Overlay,
... ... @@ -146,7 +148,7 @@ export class AttributeTableComponent extends PageComponent implements AfterViewI
146 148 this.dirtyValue = !this.activeValue;
147 149 const sortOrder: SortOrder = { property: 'key', direction: Direction.ASC };
148 150 this.pageLink = new PageLink(10, 0, null, sortOrder);
149   - this.dataSource = new AttributeDatasource(this.attributeService, this.translate);
  151 + this.dataSource = new AttributeDatasource(this.attributeService, this.telemetryWsService, this.translate);
150 152 }
151 153
152 154 ngOnInit() {
... ... @@ -333,7 +335,7 @@ export class AttributeTableComponent extends PageComponent implements AfterViewI
333 335
334 336 exitWidgetMode() {
335 337 this.mode = 'default';
336   - this.reloadAttributes();
  338 + // this.reloadAttributes();
337 339
338 340 // TODO:
339 341 }
... ...
... ... @@ -26,9 +26,11 @@ import {
26 26 AttributeData,
27 27 AttributeScope,
28 28 isClientSideTelemetryType,
29   - TelemetryType
  29 + TelemetryType,
  30 + TelemetrySubscriber
30 31 } from '@shared/models/telemetry/telemetry.models';
31 32 import { AttributeService } from '@core/http/attribute.service';
  33 +import { TelemetryWebsocketService } from '@core/ws/telemetry-websocket.service';
32 34
33 35 export class AttributeDatasource implements DataSource<AttributeData> {
34 36
... ... @@ -40,8 +42,10 @@ export class AttributeDatasource implements DataSource<AttributeData> {
40 42 public selection = new SelectionModel<AttributeData>(true, []);
41 43
42 44 private allAttributes: Observable<Array<AttributeData>>;
  45 + private telemetrySubscriber: TelemetrySubscriber;
43 46
44 47 constructor(private attributeService: AttributeService,
  48 + private telemetryWsService: TelemetryWebsocketService,
45 49 private translate: TranslateService) {}
46 50
47 51 connect(collectionViewer: CollectionViewer): Observable<AttributeData[] | ReadonlyArray<AttributeData>> {
... ... @@ -51,18 +55,24 @@ export class AttributeDatasource implements DataSource<AttributeData> {
51 55 disconnect(collectionViewer: CollectionViewer): void {
52 56 this.attributesSubject.complete();
53 57 this.pageDataSubject.complete();
  58 + if (this.telemetrySubscriber) {
  59 + this.telemetrySubscriber.unsubscribe();
  60 + this.telemetrySubscriber = null;
  61 + }
54 62 }
55 63
56 64 loadAttributes(entityId: EntityId, attributesScope: TelemetryType,
57 65 pageLink: PageLink, reload: boolean = false): Observable<PageData<AttributeData>> {
58 66 if (reload) {
59 67 this.allAttributes = null;
  68 + if (this.telemetrySubscriber) {
  69 + this.telemetrySubscriber.unsubscribe();
  70 + this.telemetrySubscriber = null;
  71 + }
60 72 }
  73 + this.selection.clear();
61 74 const result = new ReplaySubject<PageData<AttributeData>>();
62 75 this.fetchAttributes(entityId, attributesScope, pageLink).pipe(
63   - tap(() => {
64   - this.selection.clear();
65   - }),
66 76 catchError(() => of(emptyPageData<AttributeData>())),
67 77 ).subscribe(
68 78 (pageData) => {
... ... @@ -85,8 +95,10 @@ export class AttributeDatasource implements DataSource<AttributeData> {
85 95 if (!this.allAttributes) {
86 96 let attributesObservable: Observable<Array<AttributeData>>;
87 97 if (isClientSideTelemetryType.get(attributesScope)) {
88   - attributesObservable = of([]);
89   - // TODO:
  98 + this.telemetrySubscriber = TelemetrySubscriber.createEntityAttributesSubscription(
  99 + this.telemetryWsService, entityId, attributesScope);
  100 + this.telemetrySubscriber.subscribe();
  101 + attributesObservable = this.telemetrySubscriber.attributeData$();
90 102 } else {
91 103 attributesObservable = this.attributeService.getEntityAttributes(entityId, attributesScope as AttributeScope);
92 104 }
... ...
... ... @@ -16,6 +16,21 @@
16 16
17 17 -->
18 18 <mat-tab *ngIf="entity"
  19 + label="{{ 'attribute.attributes' | translate }}" #attributesTab="matTab">
  20 + <tb-attribute-table [active]="attributesTab.isActive"
  21 + [entityId]="entity.id"
  22 + [defaultAttributeScope]="attributeScopes.SERVER_SCOPE">
  23 + </tb-attribute-table>
  24 +</mat-tab>
  25 +<mat-tab *ngIf="entity"
  26 + label="{{ 'attribute.latest-telemetry' | translate }}" #telemetryTab="matTab">
  27 + <tb-attribute-table [active]="telemetryTab.isActive"
  28 + [entityId]="entity.id"
  29 + [defaultAttributeScope]="latestTelemetryTypes.LATEST_TELEMETRY"
  30 + disableAttributeScopeSelection>
  31 + </tb-attribute-table>
  32 +</mat-tab>
  33 +<mat-tab *ngIf="entity"
19 34 label="{{ 'alarm.alarms' | translate }}" #alarmsTab="matTab">
20 35 <tb-alarm-table [active]="alarmsTab.isActive" [entityId]="entity.id"></tb-alarm-table>
21 36 </mat-tab>
... ...
... ... @@ -16,6 +16,21 @@
16 16
17 17 -->
18 18 <mat-tab *ngIf="entity"
  19 + label="{{ 'attribute.attributes' | translate }}" #attributesTab="matTab">
  20 + <tb-attribute-table [active]="attributesTab.isActive"
  21 + [entityId]="entity.id"
  22 + [defaultAttributeScope]="attributeScopes.SERVER_SCOPE">
  23 + </tb-attribute-table>
  24 +</mat-tab>
  25 +<mat-tab *ngIf="entity"
  26 + label="{{ 'attribute.latest-telemetry' | translate }}" #telemetryTab="matTab">
  27 + <tb-attribute-table [active]="telemetryTab.isActive"
  28 + [entityId]="entity.id"
  29 + [defaultAttributeScope]="latestTelemetryTypes.LATEST_TELEMETRY"
  30 + disableAttributeScopeSelection>
  31 + </tb-attribute-table>
  32 +</mat-tab>
  33 +<mat-tab *ngIf="entity"
19 34 label="{{ 'alarm.alarms' | translate }}" #alarmsTab="matTab">
20 35 <tb-alarm-table [active]="alarmsTab.isActive" [entityId]="entity.id"></tb-alarm-table>
21 36 </mat-tab>
... ...
... ... @@ -16,6 +16,21 @@
16 16
17 17 -->
18 18 <mat-tab *ngIf="entity"
  19 + label="{{ 'attribute.attributes' | translate }}" #attributesTab="matTab">
  20 + <tb-attribute-table [active]="attributesTab.isActive"
  21 + [entityId]="entity.id"
  22 + [defaultAttributeScope]="attributeScopes.CLIENT_SCOPE">
  23 + </tb-attribute-table>
  24 +</mat-tab>
  25 +<mat-tab *ngIf="entity"
  26 + label="{{ 'attribute.latest-telemetry' | translate }}" #telemetryTab="matTab">
  27 + <tb-attribute-table [active]="telemetryTab.isActive"
  28 + [entityId]="entity.id"
  29 + [defaultAttributeScope]="latestTelemetryTypes.LATEST_TELEMETRY"
  30 + disableAttributeScopeSelection>
  31 + </tb-attribute-table>
  32 +</mat-tab>
  33 +<mat-tab *ngIf="entity"
19 34 label="{{ 'alarm.alarms' | translate }}" #alarmsTab="matTab">
20 35 <tb-alarm-table [active]="alarmsTab.isActive" [entityId]="entity.id"></tb-alarm-table>
21 36 </mat-tab>
... ...
... ... @@ -16,6 +16,21 @@
16 16
17 17 -->
18 18 <mat-tab *ngIf="entity"
  19 + label="{{ 'attribute.attributes' | translate }}" #attributesTab="matTab">
  20 + <tb-attribute-table [active]="attributesTab.isActive"
  21 + [entityId]="entity.id"
  22 + [defaultAttributeScope]="attributeScopes.SERVER_SCOPE">
  23 + </tb-attribute-table>
  24 +</mat-tab>
  25 +<mat-tab *ngIf="entity"
  26 + label="{{ 'attribute.latest-telemetry' | translate }}" #telemetryTab="matTab">
  27 + <tb-attribute-table [active]="telemetryTab.isActive"
  28 + [entityId]="entity.id"
  29 + [defaultAttributeScope]="latestTelemetryTypes.LATEST_TELEMETRY"
  30 + disableAttributeScopeSelection>
  31 + </tb-attribute-table>
  32 +</mat-tab>
  33 +<mat-tab *ngIf="entity"
19 34 label="{{ 'alarm.alarms' | translate }}" #alarmsTab="matTab">
20 35 <tb-alarm-table [active]="alarmsTab.isActive" [entityId]="entity.id"></tb-alarm-table>
21 36 </mat-tab>
... ...
... ... @@ -16,6 +16,21 @@
16 16
17 17 -->
18 18 <mat-tab *ngIf="entity"
  19 + label="{{ 'attribute.attributes' | translate }}" #attributesTab="matTab">
  20 + <tb-attribute-table [active]="attributesTab.isActive"
  21 + [entityId]="entity.id"
  22 + [defaultAttributeScope]="attributeScopes.SERVER_SCOPE">
  23 + </tb-attribute-table>
  24 +</mat-tab>
  25 +<mat-tab *ngIf="entity"
  26 + label="{{ 'attribute.latest-telemetry' | translate }}" #telemetryTab="matTab">
  27 + <tb-attribute-table [active]="telemetryTab.isActive"
  28 + [entityId]="entity.id"
  29 + [defaultAttributeScope]="latestTelemetryTypes.LATEST_TELEMETRY"
  30 + disableAttributeScopeSelection>
  31 + </tb-attribute-table>
  32 +</mat-tab>
  33 +<mat-tab *ngIf="entity"
19 34 label="{{ 'alarm.alarms' | translate }}" #alarmsTab="matTab">
20 35 <tb-alarm-table [active]="alarmsTab.isActive" [entityId]="entity.id"></tb-alarm-table>
21 36 </mat-tab>
... ...
... ... @@ -15,7 +15,11 @@
15 15 ///
16 16
17 17
18   -import { AlarmSeverity } from '@shared/models/alarm.models';
  18 +import { EntityType } from '@shared/models/entity-type.models';
  19 +import { AggregationType } from '../time/time.models';
  20 +import { Observable, ReplaySubject, Subject } from 'rxjs';
  21 +import { EntityId } from '@shared/models/id/entity-id';
  22 +import { map } from 'rxjs/operators';
19 23
20 24 export enum DataKeyType {
21 25 timeseries = 'timeseries',
... ... @@ -34,6 +38,11 @@ export enum AttributeScope {
34 38 SHARED_SCOPE = 'SHARED_SCOPE'
35 39 }
36 40
  41 +export enum TelemetryFeature {
  42 + ATTRIBUTES = 'ATTRIBUTES',
  43 + TIMESERIES = 'TIMESERIES'
  44 +}
  45 +
37 46 export type TelemetryType = LatestTelemetry | AttributeScope;
38 47
39 48 export const telemetryTypeTranslations = new Map<TelemetryType, string>(
... ... @@ -59,3 +68,222 @@ export interface AttributeData {
59 68 key: string;
60 69 value: any;
61 70 }
  71 +
  72 +export interface TelemetryPluginCmd {
  73 + cmdId: number;
  74 + keys: string;
  75 +}
  76 +
  77 +export abstract class SubscriptionCmd implements TelemetryPluginCmd {
  78 + cmdId: number;
  79 + keys: string;
  80 + entityType: EntityType;
  81 + entityId: string;
  82 + scope?: AttributeScope;
  83 + unsubscribe: boolean;
  84 + abstract getType(): TelemetryFeature;
  85 +}
  86 +
  87 +export class AttributesSubscriptionCmd extends SubscriptionCmd {
  88 + getType() {
  89 + return TelemetryFeature.ATTRIBUTES;
  90 + }
  91 +}
  92 +
  93 +export class TimeseriesSubscriptionCmd extends SubscriptionCmd {
  94 + startTs: number;
  95 + timeWindow: number;
  96 + interval: number;
  97 + limit: number;
  98 + agg: AggregationType;
  99 +
  100 + getType() {
  101 + return TelemetryFeature.TIMESERIES;
  102 + }
  103 +}
  104 +
  105 +export class GetHistoryCmd implements TelemetryPluginCmd {
  106 + cmdId: number;
  107 + keys: string;
  108 + entityType: EntityType;
  109 + entityId: string;
  110 + startTs: number;
  111 + endTs: number;
  112 + interval: number;
  113 + limit: number;
  114 + agg: AggregationType;
  115 +}
  116 +
  117 +export class TelemetryPluginCmdsWrapper {
  118 + attrSubCmds: Array<AttributesSubscriptionCmd>;
  119 + tsSubCmds: Array<TimeseriesSubscriptionCmd>;
  120 + historyCmds: Array<GetHistoryCmd>;
  121 +
  122 + constructor() {
  123 + this.attrSubCmds = [];
  124 + this.tsSubCmds = [];
  125 + this.historyCmds = [];
  126 + }
  127 +
  128 + public hasCommands(): boolean {
  129 + return this.tsSubCmds.length > 0 ||
  130 + this.historyCmds.length > 0 ||
  131 + this.attrSubCmds.length > 0;
  132 + }
  133 +
  134 + public clear() {
  135 + this.attrSubCmds.length = 0;
  136 + this.tsSubCmds.length = 0;
  137 + this.historyCmds.length = 0;
  138 + }
  139 +
  140 + public preparePublishCommands(maxCommands: number): TelemetryPluginCmdsWrapper {
  141 + const preparedWrapper = new TelemetryPluginCmdsWrapper();
  142 + let leftCount = maxCommands;
  143 + preparedWrapper.tsSubCmds = this.popCmds(this.tsSubCmds, leftCount);
  144 + leftCount -= preparedWrapper.tsSubCmds.length;
  145 + preparedWrapper.historyCmds = this.popCmds(this.historyCmds, leftCount);
  146 + leftCount -= preparedWrapper.historyCmds.length;
  147 + preparedWrapper.attrSubCmds = this.popCmds(this.attrSubCmds, leftCount);
  148 + return preparedWrapper;
  149 + }
  150 +
  151 + private popCmds<T extends TelemetryPluginCmd>(cmds: Array<T>, leftCount: number): Array<T> {
  152 + const toPublish = Math.min(cmds.length, leftCount);
  153 + if (toPublish > 0) {
  154 + return cmds.splice(0, toPublish);
  155 + } else {
  156 + return [];
  157 + }
  158 + }
  159 +}
  160 +
  161 +export interface SubscriptionUpdateMsg {
  162 + subscriptionId: number;
  163 + errorCode: number;
  164 + errorMsg: string;
  165 + data: {[key: string]: [number, string][]};
  166 +}
  167 +
  168 +export class SubscriptionUpdate implements SubscriptionUpdateMsg {
  169 + subscriptionId: number;
  170 + errorCode: number;
  171 + errorMsg: string;
  172 + data: {[key: string]: [number, string][]};
  173 +
  174 + constructor(msg: SubscriptionUpdateMsg) {
  175 + this.subscriptionId = msg.subscriptionId;
  176 + this.errorCode = msg.errorCode;
  177 + this.errorMsg = msg.errorMsg;
  178 + this.data = msg.data;
  179 + }
  180 +
  181 + public prepareData(keys: string[]) {
  182 + if (!this.data) {
  183 + this.data = {};
  184 + }
  185 + if (keys) {
  186 + keys.forEach((key) => {
  187 + if (!this.data[key]) {
  188 + this.data[key] = [];
  189 + }
  190 + });
  191 + }
  192 + }
  193 +
  194 + public updateAttributeData(origData: Array<AttributeData>): Array<AttributeData> {
  195 + for (const key of Object.keys(this.data)) {
  196 + const keyData = this.data[key];
  197 + if (keyData.length) {
  198 + const existing = origData.find((data) => data.key === key);
  199 + if (existing) {
  200 + existing.lastUpdateTs = keyData[0][0];
  201 + existing.value = keyData[0][1];
  202 + } else {
  203 + origData.push(
  204 + {
  205 + key,
  206 + lastUpdateTs: keyData[0][0],
  207 + value: keyData[0][1]
  208 + }
  209 + );
  210 + }
  211 + }
  212 + }
  213 + return origData;
  214 + }
  215 +}
  216 +
  217 +export interface TelemetryService {
  218 + subscribe(subscriber: TelemetrySubscriber);
  219 + unsubscribe(subscriber: TelemetrySubscriber);
  220 +}
  221 +
  222 +export class TelemetrySubscriber {
  223 +
  224 + private dataSubject = new ReplaySubject<SubscriptionUpdate>();
  225 + private reconnectSubject = new Subject();
  226 +
  227 + public subscriptionCommands: Array<TelemetryPluginCmd>;
  228 +
  229 + public data$ = this.dataSubject.asObservable();
  230 + public reconnect$ = this.reconnectSubject.asObservable();
  231 +
  232 + public static createEntityAttributesSubscription(telemetryService: TelemetryService,
  233 + entityId: EntityId, attributeScope: TelemetryType,
  234 + keys: string[] = null): TelemetrySubscriber {
  235 + let subscriptionCommand: SubscriptionCmd;
  236 + if (attributeScope === LatestTelemetry.LATEST_TELEMETRY) {
  237 + subscriptionCommand = new TimeseriesSubscriptionCmd();
  238 + } else {
  239 + subscriptionCommand = new AttributesSubscriptionCmd();
  240 + }
  241 + subscriptionCommand.entityType = entityId.entityType as EntityType;
  242 + subscriptionCommand.entityId = entityId.id;
  243 + subscriptionCommand.scope = attributeScope as AttributeScope;
  244 + if (keys) {
  245 + subscriptionCommand.keys = keys.join(',');
  246 + }
  247 + const subscriber = new TelemetrySubscriber(telemetryService);
  248 + subscriber.subscriptionCommands.push(subscriptionCommand);
  249 + return subscriber;
  250 + }
  251 +
  252 + constructor(private telemetryService: TelemetryService) {
  253 + this.subscriptionCommands = [];
  254 + }
  255 +
  256 + public subscribe() {
  257 + this.telemetryService.subscribe(this);
  258 + }
  259 +
  260 + public unsubscribe() {
  261 + this.telemetryService.unsubscribe(this);
  262 + this.dataSubject.complete();
  263 + this.reconnectSubject.complete();
  264 + }
  265 +
  266 + public onData(message: SubscriptionUpdate) {
  267 + const cmdId = message.subscriptionId;
  268 + let keys: string[];
  269 + const cmd = this.subscriptionCommands.find((command) => command.cmdId === cmdId);
  270 + if (cmd) {
  271 + if (cmd.keys && cmd.keys.length) {
  272 + keys = cmd.keys.split(',');
  273 + }
  274 + }
  275 + message.prepareData(keys);
  276 + this.dataSubject.next(message);
  277 + }
  278 +
  279 + public onReconnected() {
  280 + this.reconnectSubject.next();
  281 + }
  282 +
  283 + public attributeData$(): Observable<Array<AttributeData>> {
  284 + const attributeData = new Array<AttributeData>();
  285 + return this.data$.pipe(
  286 + map((message) => message.updateAttributeData(attributeData))
  287 + );
  288 + }
  289 +}
... ...