Commit 391c88e8cba1183de1d2990b524c9fcfce53736c
1 parent
e6e9be18
UI: Improve data subscription reconnect flow
Showing
6 changed files
with
62 additions
and
27 deletions
... | ... | @@ -37,7 +37,7 @@ import { |
37 | 37 | } from '@shared/models/telemetry/telemetry.models'; |
38 | 38 | import { UtilsService } from '@core/services/utils.service'; |
39 | 39 | import { EntityDataListener, EntityDataLoadResult } from '@core/api/entity-data.service'; |
40 | -import { deepClone, isDefinedAndNotNull, isObject, objectHashCode } from '@core/utils'; | |
40 | +import { deepClone, isDefinedAndNotNull, isEqual, isObject, objectHashCode } from '@core/utils'; | |
41 | 41 | import { PageData } from '@shared/models/page/page-data'; |
42 | 42 | import { DataAggregator } from '@core/api/data-aggregator'; |
43 | 43 | import { NULL_UUID } from '@shared/models/id/has-uuid'; |
... | ... | @@ -50,7 +50,7 @@ export interface EntityDataSubscriptionOptions { |
50 | 50 | dataKeys: Array<SubscriptionDataKey>; |
51 | 51 | type: widgetType; |
52 | 52 | entityFilter?: EntityFilter; |
53 | - isLatestDataSubscription?: boolean; | |
53 | + isPaginatedDataSubscription?: boolean; | |
54 | 54 | pageLink?: EntityDataPageLink; |
55 | 55 | keyFilters?: Array<KeyFilter>; |
56 | 56 | subscriptionTimewindow?: SubscriptionTimewindow; |
... | ... | @@ -154,7 +154,7 @@ export class EntityDataSubscription { |
154 | 154 | } |
155 | 155 | |
156 | 156 | public subscribe(): Observable<EntityDataLoadResult> { |
157 | - if (!this.entityDataSubscriptionOptions.isLatestDataSubscription) { | |
157 | + if (!this.entityDataSubscriptionOptions.isPaginatedDataSubscription) { | |
158 | 158 | this.entityDataResolveSubject = new ReplaySubject(1); |
159 | 159 | } else { |
160 | 160 | this.started = true; |
... | ... | @@ -199,7 +199,7 @@ export class EntityDataSubscription { |
199 | 199 | latestValues: this.latestValues |
200 | 200 | }; |
201 | 201 | |
202 | - if (this.entityDataSubscriptionOptions.isLatestDataSubscription) { | |
202 | + if (this.entityDataSubscriptionOptions.isPaginatedDataSubscription) { | |
203 | 203 | if (this.entityDataSubscriptionOptions.type === widgetType.latest) { |
204 | 204 | if (this.latestValues.length > 0) { |
205 | 205 | this.dataCommand.latestCmd = { |
... | ... | @@ -222,7 +222,7 @@ export class EntityDataSubscription { |
222 | 222 | ); |
223 | 223 | |
224 | 224 | this.subscriber.reconnect$.subscribe(() => { |
225 | - if (this.started && !this.entityDataSubscriptionOptions.isLatestDataSubscription) { | |
225 | + if (this.started && !this.entityDataSubscriptionOptions.isPaginatedDataSubscription) { | |
226 | 226 | if (this.entityDataSubscriptionOptions.type === widgetType.timeseries && |
227 | 227 | !this.history && this.tsFields.length) { |
228 | 228 | const newSubsTw: SubscriptionTimewindow = this.listener.updateRealtimeSubscription(); |
... | ... | @@ -271,14 +271,14 @@ export class EntityDataSubscription { |
271 | 271 | totalPages: 1 |
272 | 272 | }; |
273 | 273 | this.onPageData(pageData); |
274 | - if (this.entityDataSubscriptionOptions.isLatestDataSubscription) { | |
274 | + if (this.entityDataSubscriptionOptions.isPaginatedDataSubscription) { | |
275 | 275 | if (this.entityDataSubscriptionOptions.type === widgetType.latest) { |
276 | 276 | this.frequency = 1000; |
277 | 277 | this.timer = setTimeout(this.onTick.bind(this, true), 0); |
278 | 278 | } |
279 | 279 | } |
280 | 280 | } |
281 | - if (this.entityDataSubscriptionOptions.isLatestDataSubscription) { | |
281 | + if (this.entityDataSubscriptionOptions.isPaginatedDataSubscription) { | |
282 | 282 | return of(null); |
283 | 283 | } else { |
284 | 284 | return this.entityDataResolveSubject.asObservable(); |
... | ... | @@ -286,7 +286,7 @@ export class EntityDataSubscription { |
286 | 286 | } |
287 | 287 | |
288 | 288 | public start() { |
289 | - if (this.entityDataSubscriptionOptions.isLatestDataSubscription) { | |
289 | + if (this.entityDataSubscriptionOptions.isPaginatedDataSubscription) { | |
290 | 290 | return; |
291 | 291 | } |
292 | 292 | this.subsTw = this.entityDataSubscriptionOptions.subscriptionTimewindow; |
... | ... | @@ -432,9 +432,26 @@ export class EntityDataSubscription { |
432 | 432 | } |
433 | 433 | } |
434 | 434 | |
435 | + private pageDataChanged(prevPageData: PageData<EntityData>, nextPageData: PageData<EntityData>) { | |
436 | + const prevIds = prevPageData.data.map((entityData) => entityData.entityId.id); | |
437 | + const nextIds = nextPageData.data.map((entityData) => entityData.entityId.id); | |
438 | + return !isEqual(prevIds, nextIds); | |
439 | + } | |
440 | + | |
435 | 441 | private onPageData(pageData: PageData<EntityData>) { |
442 | + const isInitialData = !this.pageData; | |
443 | + if (!isInitialData && !this.entityDataSubscriptionOptions.isPaginatedDataSubscription) { | |
444 | + if (this.pageDataChanged(this.pageData, pageData)) { | |
445 | + if (this.listener.initialPageDataChanged) { | |
446 | + this.listener.initialPageDataChanged(pageData); | |
447 | + } | |
448 | + return; | |
449 | + } | |
450 | + } | |
436 | 451 | this.pageData = pageData; |
437 | - this.resetData(); | |
452 | + if (isInitialData || this.entityDataSubscriptionOptions.isPaginatedDataSubscription) { | |
453 | + this.resetData(); | |
454 | + } | |
438 | 455 | const data: Array<Array<DataSetHolder>> = []; |
439 | 456 | for (let dataIndex = 0; dataIndex < pageData.data.length; dataIndex++) { |
440 | 457 | const entityData = pageData.data[dataIndex]; |
... | ... | @@ -458,8 +475,10 @@ export class EntityDataSubscription { |
458 | 475 | ); |
459 | 476 | this.entityDataResolveSubject.complete(); |
460 | 477 | } else { |
461 | - this.listener.dataLoaded(pageData, data, | |
462 | - this.listener.configDatasourceIndex); | |
478 | + if (isInitialData || this.entityDataSubscriptionOptions.isPaginatedDataSubscription) { | |
479 | + this.listener.dataLoaded(pageData, data, | |
480 | + this.listener.configDatasourceIndex); | |
481 | + } | |
463 | 482 | } |
464 | 483 | } |
465 | 484 | |
... | ... | @@ -487,8 +506,13 @@ export class EntityDataSubscription { |
487 | 506 | } |
488 | 507 | if (this.entityDataSubscriptionOptions.type === widgetType.timeseries && entityData.timeseries) { |
489 | 508 | const subscriptionData = this.toSubscriptionData(entityData.timeseries, true); |
490 | - if (!this.history && aggregate) { | |
491 | - this.dataAggregators[dataIndex].onData({data: subscriptionData}, false, false, true); | |
509 | + if (!this.history) { | |
510 | + if (this.dataAggregators && this.dataAggregators[dataIndex]) { | |
511 | + this.dataAggregators[dataIndex].onData({data: subscriptionData}, false, false, true); | |
512 | + } | |
513 | + if (!aggregate) { | |
514 | + this.onData(subscriptionData, DataKeyType.timeseries, dataIndex, true, dataUpdatedCb); | |
515 | + } | |
492 | 516 | } else { |
493 | 517 | this.onData(subscriptionData, DataKeyType.timeseries, dataIndex, true, dataUpdatedCb); |
494 | 518 | } | ... | ... |
... | ... | @@ -33,6 +33,7 @@ export interface EntityDataListener { |
33 | 33 | configDatasourceIndex: number; |
34 | 34 | dataLoaded: (pageData: PageData<EntityData>, data: Array<Array<DataSetHolder>>, datasourceIndex: number) => void; |
35 | 35 | dataUpdated: (data: DataSetHolder, datasourceIndex: number, dataIndex: number, dataKeyIndex: number, detectChanges: boolean) => void; |
36 | + initialPageDataChanged?: (nextPageData: PageData<EntityData>) => void; | |
36 | 37 | updateRealtimeSubscription?: () => SubscriptionTimewindow; |
37 | 38 | setRealtimeSubscription?: (subscriptionTimewindow: SubscriptionTimewindow) => void; |
38 | 39 | subscription?: EntityDataSubscription; |
... | ... | @@ -70,9 +71,9 @@ export class EntityDataService { |
70 | 71 | listener.subscription.start(); |
71 | 72 | } |
72 | 73 | |
73 | - public subscribeForLatestData(listener: EntityDataListener, | |
74 | - pageLink: EntityDataPageLink, | |
75 | - keyFilters: KeyFilter[]) { | |
74 | + public subscribeForPaginatedData(listener: EntityDataListener, | |
75 | + pageLink: EntityDataPageLink, | |
76 | + keyFilters: KeyFilter[]) { | |
76 | 77 | const datasource = listener.configDatasource; |
77 | 78 | if (datasource.type === DatasourceType.entity && (!datasource.entityFilter || !pageLink)) { |
78 | 79 | return; |
... | ... | @@ -89,7 +90,7 @@ export class EntityDataService { |
89 | 90 | private createSubscription(listener: EntityDataListener, |
90 | 91 | pageLink: EntityDataPageLink, |
91 | 92 | keyFilters: KeyFilter[], |
92 | - isLatestDataSubscription: boolean): EntityDataSubscription { | |
93 | + isPaginatedDataSubscription: boolean): EntityDataSubscription { | |
93 | 94 | const datasource = listener.configDatasource; |
94 | 95 | const subscriptionDataKeys: Array<SubscriptionDataKey> = []; |
95 | 96 | datasource.dataKeys.forEach((dataKey) => { |
... | ... | @@ -111,7 +112,7 @@ export class EntityDataService { |
111 | 112 | entityDataSubscriptionOptions.pageLink = pageLink; |
112 | 113 | entityDataSubscriptionOptions.keyFilters = keyFilters; |
113 | 114 | } |
114 | - entityDataSubscriptionOptions.isLatestDataSubscription = isLatestDataSubscription; | |
115 | + entityDataSubscriptionOptions.isPaginatedDataSubscription = isPaginatedDataSubscription; | |
115 | 116 | return new EntityDataSubscription(entityDataSubscriptionOptions, |
116 | 117 | listener, this.telemetryService, this.utils); |
117 | 118 | } | ... | ... |
... | ... | @@ -41,7 +41,7 @@ import { EntityAliases } from '@shared/models/alias.models'; |
41 | 41 | import { EntityInfo } from '@app/shared/models/entity.models'; |
42 | 42 | import { IDashboardComponent } from '@home/models/dashboard-component.models'; |
43 | 43 | import * as moment_ from 'moment'; |
44 | -import { EntityDataPageLink, EntityFilter, KeyFilter } from '@shared/models/query/query.models'; | |
44 | +import { EntityData, EntityDataPageLink, EntityFilter, KeyFilter } from '@shared/models/query/query.models'; | |
45 | 45 | import { EntityDataService } from '@core/api/entity-data.service'; |
46 | 46 | import { PageData } from '@shared/models/page/page-data'; |
47 | 47 | |
... | ... | @@ -185,6 +185,7 @@ export class WidgetSubscriptionContext { |
185 | 185 | export interface WidgetSubscriptionCallbacks { |
186 | 186 | onDataUpdated?: (subscription: IWidgetSubscription, detectChanges: boolean) => void; |
187 | 187 | onDataUpdateError?: (subscription: IWidgetSubscription, e: any) => void; |
188 | + onInitialPageDataChanged?: (subscription: IWidgetSubscription, nextPageData: PageData<EntityData>) => void; | |
188 | 189 | dataLoading?: (subscription: IWidgetSubscription) => void; |
189 | 190 | legendDataUpdated?: (subscription: IWidgetSubscription, detectChanges: boolean) => void; |
190 | 191 | timeWindowUpdated?: (subscription: IWidgetSubscription, timeWindowConfig: Timewindow) => void; |
... | ... | @@ -279,9 +280,9 @@ export interface IWidgetSubscription { |
279 | 280 | |
280 | 281 | subscribe(): void; |
281 | 282 | |
282 | - subscribeForLatestData(datasourceIndex: number, | |
283 | - pageLink: EntityDataPageLink, | |
284 | - keyFilters: KeyFilter[]): void; | |
283 | + subscribeForPaginatedData(datasourceIndex: number, | |
284 | + pageLink: EntityDataPageLink, | |
285 | + keyFilters: KeyFilter[]): void; | |
285 | 286 | |
286 | 287 | isDataResolved(): boolean; |
287 | 288 | ... | ... |
... | ... | @@ -209,6 +209,7 @@ export class WidgetSubscription implements IWidgetSubscription { |
209 | 209 | } else { |
210 | 210 | this.callbacks.onDataUpdated = this.callbacks.onDataUpdated || (() => {}); |
211 | 211 | this.callbacks.onDataUpdateError = this.callbacks.onDataUpdateError || (() => {}); |
212 | + this.callbacks.onInitialPageDataChanged = this.callbacks.onInitialPageDataChanged || (() => {}); | |
212 | 213 | this.callbacks.dataLoading = this.callbacks.dataLoading || (() => {}); |
213 | 214 | this.callbacks.legendDataUpdated = this.callbacks.legendDataUpdated || (() => {}); |
214 | 215 | this.callbacks.timeWindowUpdated = this.callbacks.timeWindowUpdated || (() => {}); |
... | ... | @@ -400,6 +401,7 @@ export class WidgetSubscription implements IWidgetSubscription { |
400 | 401 | dataLoaded: (pageData, data1, datasourceIndex) => { |
401 | 402 | this.dataLoaded(pageData, data1, datasourceIndex, true) |
402 | 403 | }, |
404 | + initialPageDataChanged: this.initialPageDataChanged.bind(this), | |
403 | 405 | dataUpdated: this.dataUpdated.bind(this), |
404 | 406 | updateRealtimeSubscription: () => { |
405 | 407 | this.subscriptionTimewindow = this.updateRealtimeSubscription(); |
... | ... | @@ -852,9 +854,9 @@ export class WidgetSubscription implements IWidgetSubscription { |
852 | 854 | } |
853 | 855 | } |
854 | 856 | |
855 | - subscribeForLatestData(datasourceIndex: number, | |
856 | - pageLink: EntityDataPageLink, | |
857 | - keyFilters: KeyFilter[]): void { | |
857 | + subscribeForPaginatedData(datasourceIndex: number, | |
858 | + pageLink: EntityDataPageLink, | |
859 | + keyFilters: KeyFilter[]): void { | |
858 | 860 | let entityDataListener = this.entityDataListeners[datasourceIndex]; |
859 | 861 | if (entityDataListener) { |
860 | 862 | this.ctx.entityDataService.stopSubscription(entityDataListener); |
... | ... | @@ -871,7 +873,7 @@ export class WidgetSubscription implements IWidgetSubscription { |
871 | 873 | dataUpdated: this.dataUpdated.bind(this) |
872 | 874 | }; |
873 | 875 | this.entityDataListeners[datasourceIndex] = entityDataListener; |
874 | - this.ctx.entityDataService.subscribeForLatestData(entityDataListener, pageLink, keyFilters); | |
876 | + this.ctx.entityDataService.subscribeForPaginatedData(entityDataListener, pageLink, keyFilters); | |
875 | 877 | } |
876 | 878 | } |
877 | 879 | |
... | ... | @@ -1156,6 +1158,10 @@ export class WidgetSubscription implements IWidgetSubscription { |
1156 | 1158 | return this.timewindowForComparison; |
1157 | 1159 | } |
1158 | 1160 | |
1161 | + private initialPageDataChanged(nextPageData: PageData<EntityData>) { | |
1162 | + this.callbacks.onInitialPageDataChanged(this, nextPageData); | |
1163 | + } | |
1164 | + | |
1159 | 1165 | private dataLoaded(pageData: PageData<EntityData>, |
1160 | 1166 | data: Array<Array<DataSetHolder>>, |
1161 | 1167 | datasourceIndex: number, isUpdate: boolean) { | ... | ... |
... | ... | @@ -614,7 +614,7 @@ class EntityDatasource implements DataSource<EntityData> { |
614 | 614 | } |
615 | 615 | |
616 | 616 | loadEntities(pageLink: EntityDataPageLink, keyFilters: KeyFilter[]) { |
617 | - this.subscription.subscribeForLatestData(0, pageLink, keyFilters); | |
617 | + this.subscription.subscribeForPaginatedData(0, pageLink, keyFilters); | |
618 | 618 | /* this.fetchEntities(pageLink).pipe( |
619 | 619 | catchError(() => of(emptyPageData<EntityData>())), |
620 | 620 | ).subscribe( | ... | ... |
... | ... | @@ -818,6 +818,9 @@ export class WidgetComponent extends PageComponent implements OnInit, AfterViewI |
818 | 818 | onDataUpdateError: (subscription, e) => { |
819 | 819 | this.handleWidgetException(e); |
820 | 820 | }, |
821 | + onInitialPageDataChanged: (subscription, nextPageData) => { | |
822 | + this.reInit(); | |
823 | + }, | |
821 | 824 | dataLoading: (subscription) => { |
822 | 825 | if (this.loadingData !== subscription.loadingData) { |
823 | 826 | this.loadingData = subscription.loadingData; | ... | ... |