Commit 62c9630e34ba476af4958e88111c524f770f52e3
1 parent
d2d95a25
Use generics to remove duplication for event fetcher. Use page data iterable
Showing
18 changed files
with
274 additions
and
210 deletions
... | ... | @@ -37,10 +37,10 @@ import org.thingsboard.server.common.data.EdgeUtils; |
37 | 37 | import org.thingsboard.server.common.data.EntityType; |
38 | 38 | import org.thingsboard.server.common.data.EntityView; |
39 | 39 | import org.thingsboard.server.common.data.EntityViewInfo; |
40 | -import org.thingsboard.server.common.data.OtaPackage; | |
41 | -import org.thingsboard.server.common.data.OtaPackageInfo; | |
42 | 40 | import org.thingsboard.server.common.data.HasName; |
43 | 41 | import org.thingsboard.server.common.data.HasTenantId; |
42 | +import org.thingsboard.server.common.data.OtaPackage; | |
43 | +import org.thingsboard.server.common.data.OtaPackageInfo; | |
44 | 44 | import org.thingsboard.server.common.data.TbResource; |
45 | 45 | import org.thingsboard.server.common.data.TbResourceInfo; |
46 | 46 | import org.thingsboard.server.common.data.Tenant; |
... | ... | @@ -70,7 +70,6 @@ import org.thingsboard.server.common.data.id.EntityIdFactory; |
70 | 70 | import org.thingsboard.server.common.data.id.EntityViewId; |
71 | 71 | import org.thingsboard.server.common.data.id.OtaPackageId; |
72 | 72 | import org.thingsboard.server.common.data.id.RpcId; |
73 | -import org.thingsboard.server.common.data.id.TbResourceId; | |
74 | 73 | import org.thingsboard.server.common.data.id.RuleChainId; |
75 | 74 | import org.thingsboard.server.common.data.id.RuleNodeId; |
76 | 75 | import org.thingsboard.server.common.data.id.TbResourceId; |
... | ... | @@ -79,7 +78,7 @@ import org.thingsboard.server.common.data.id.TenantProfileId; |
79 | 78 | import org.thingsboard.server.common.data.id.UserId; |
80 | 79 | import org.thingsboard.server.common.data.id.WidgetTypeId; |
81 | 80 | import org.thingsboard.server.common.data.id.WidgetsBundleId; |
82 | -import org.thingsboard.server.common.data.page.PageData; | |
81 | +import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; | |
83 | 82 | import org.thingsboard.server.common.data.page.PageLink; |
84 | 83 | import org.thingsboard.server.common.data.page.SortOrder; |
85 | 84 | import org.thingsboard.server.common.data.page.TimePageLink; |
... | ... | @@ -105,10 +104,10 @@ import org.thingsboard.server.dao.edge.EdgeService; |
105 | 104 | import org.thingsboard.server.dao.entityview.EntityViewService; |
106 | 105 | import org.thingsboard.server.dao.exception.DataValidationException; |
107 | 106 | import org.thingsboard.server.dao.exception.IncorrectParameterException; |
108 | -import org.thingsboard.server.dao.ota.OtaPackageService; | |
109 | 107 | import org.thingsboard.server.dao.model.ModelConstants; |
110 | 108 | import org.thingsboard.server.dao.oauth2.OAuth2ConfigTemplateService; |
111 | 109 | import org.thingsboard.server.dao.oauth2.OAuth2Service; |
110 | +import org.thingsboard.server.dao.ota.OtaPackageService; | |
112 | 111 | import org.thingsboard.server.dao.relation.RelationService; |
113 | 112 | import org.thingsboard.server.dao.rpc.RpcService; |
114 | 113 | import org.thingsboard.server.dao.rule.RuleChainService; |
... | ... | @@ -125,11 +124,10 @@ import org.thingsboard.server.queue.provider.TbQueueProducerProvider; |
125 | 124 | import org.thingsboard.server.queue.util.TbCoreComponent; |
126 | 125 | import org.thingsboard.server.service.action.RuleEngineEntityActionService; |
127 | 126 | import org.thingsboard.server.service.component.ComponentDiscoveryService; |
128 | -import org.thingsboard.server.service.edge.rpc.EdgeRpcService; | |
129 | -import org.thingsboard.server.service.ota.OtaPackageStateService; | |
130 | 127 | import org.thingsboard.server.service.edge.EdgeNotificationService; |
131 | -import org.thingsboard.server.service.edge.rpc.EdgeGrpcService; | |
128 | +import org.thingsboard.server.service.edge.rpc.EdgeRpcService; | |
132 | 129 | import org.thingsboard.server.service.lwm2m.LwM2MServerSecurityInfoRepository; |
130 | +import org.thingsboard.server.service.ota.OtaPackageStateService; | |
133 | 131 | import org.thingsboard.server.service.profile.TbDeviceProfileCache; |
134 | 132 | import org.thingsboard.server.service.queue.TbClusterService; |
135 | 133 | import org.thingsboard.server.service.resource.TbResourceService; |
... | ... | @@ -923,18 +921,12 @@ public abstract class BaseController { |
923 | 921 | if (EntityType.EDGE.equals(entityId.getEntityType())) { |
924 | 922 | return Collections.singletonList(new EdgeId(entityId.getId())); |
925 | 923 | } |
924 | + PageDataIterableByTenantIdEntityId<EdgeId> relatedEdgeIdsIterator = | |
925 | + new PageDataIterableByTenantIdEntityId<>(edgeService::findRelatedEdgeIdsByEntityId, tenantId, entityId, DEFAULT_PAGE_SIZE); | |
926 | 926 | List<EdgeId> result = new ArrayList<>(); |
927 | - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); | |
928 | - PageData<EdgeId> pageData; | |
929 | - do { | |
930 | - pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink); | |
931 | - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { | |
932 | - result.addAll(pageData.getData()); | |
933 | - if (pageData.hasNext()) { | |
934 | - pageLink = pageLink.nextPageLink(); | |
935 | - } | |
936 | - } | |
937 | - } while (pageData != null && pageData.hasNext()); | |
927 | + for(EdgeId edgeId : relatedEdgeIdsIterator) { | |
928 | + result.add(edgeId); | |
929 | + } | |
938 | 930 | return result; |
939 | 931 | } |
940 | 932 | ... | ... |
... | ... | @@ -50,6 +50,7 @@ import org.thingsboard.server.common.data.id.RuleChainId; |
50 | 50 | import org.thingsboard.server.common.data.id.RuleNodeId; |
51 | 51 | import org.thingsboard.server.common.data.id.TenantId; |
52 | 52 | import org.thingsboard.server.common.data.page.PageData; |
53 | +import org.thingsboard.server.common.data.page.PageDataIterableByTenant; | |
53 | 54 | import org.thingsboard.server.common.data.page.PageLink; |
54 | 55 | import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; |
55 | 56 | import org.thingsboard.server.common.data.rule.DefaultRuleChainCreateRequest; |
... | ... | @@ -645,17 +646,11 @@ public class RuleChainController extends BaseController { |
645 | 646 | try { |
646 | 647 | TenantId tenantId = getCurrentUser().getTenantId(); |
647 | 648 | List<RuleChain> result = new ArrayList<>(); |
648 | - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); | |
649 | - PageData<RuleChain> pageData; | |
650 | - do { | |
651 | - pageData = ruleChainService.findAutoAssignToEdgeRuleChainsByTenantId(tenantId, pageLink); | |
652 | - if (pageData.getData().size() > 0) { | |
653 | - result.addAll(pageData.getData()); | |
654 | - if (pageData.hasNext()) { | |
655 | - pageLink = pageLink.nextPageLink(); | |
656 | - } | |
657 | - } | |
658 | - } while (pageData.hasNext()); | |
649 | + PageDataIterableByTenant<RuleChain> autoAssignRuleChainsIterator = | |
650 | + new PageDataIterableByTenant<>(ruleChainService::findAutoAssignToEdgeRuleChainsByTenantId, tenantId, DEFAULT_PAGE_SIZE); | |
651 | + for (RuleChain ruleChain : autoAssignRuleChainsIterator) { | |
652 | + result.add(ruleChain); | |
653 | + } | |
659 | 654 | return checkNotNull(result); |
660 | 655 | } catch (Exception e) { |
661 | 656 | throw handleException(e); | ... | ... |
... | ... | @@ -16,7 +16,6 @@ |
16 | 16 | package org.thingsboard.server.service.edge; |
17 | 17 | |
18 | 18 | import lombok.Data; |
19 | -import lombok.Getter; | |
20 | 19 | import org.springframework.beans.factory.annotation.Autowired; |
21 | 20 | import org.springframework.context.annotation.Lazy; |
22 | 21 | import org.springframework.stereotype.Component; |
... | ... | @@ -53,117 +52,90 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService; |
53 | 52 | @Component |
54 | 53 | @TbCoreComponent |
55 | 54 | @Data |
55 | +@Lazy | |
56 | 56 | public class EdgeContextComponent { |
57 | 57 | |
58 | - @Lazy | |
59 | 58 | @Autowired |
60 | 59 | private EdgeService edgeService; |
61 | 60 | |
62 | - @Lazy | |
63 | 61 | @Autowired |
64 | 62 | private EdgeEventService edgeEventService; |
65 | 63 | |
66 | - @Lazy | |
67 | 64 | @Autowired |
68 | 65 | private AdminSettingsService adminSettingsService; |
69 | 66 | |
70 | - @Lazy | |
71 | 67 | @Autowired |
72 | 68 | private AssetService assetService; |
73 | 69 | |
74 | - @Lazy | |
75 | 70 | @Autowired |
76 | 71 | private DeviceProfileService deviceProfileService; |
77 | 72 | |
78 | - @Lazy | |
79 | 73 | @Autowired |
80 | 74 | private AttributesService attributesService; |
81 | 75 | |
82 | - @Lazy | |
83 | 76 | @Autowired |
84 | 77 | private DashboardService dashboardService; |
85 | 78 | |
86 | - @Lazy | |
87 | 79 | @Autowired |
88 | 80 | private RuleChainService ruleChainService; |
89 | 81 | |
90 | - @Lazy | |
91 | 82 | @Autowired |
92 | 83 | private UserService userService; |
93 | 84 | |
94 | - @Lazy | |
95 | 85 | @Autowired |
96 | 86 | private WidgetsBundleService widgetsBundleService; |
97 | 87 | |
98 | - @Lazy | |
99 | 88 | @Autowired |
100 | 89 | private EdgeRequestsService edgeRequestsService; |
101 | 90 | |
102 | - @Lazy | |
103 | 91 | @Autowired |
104 | 92 | private AlarmEdgeProcessor alarmProcessor; |
105 | 93 | |
106 | - @Lazy | |
107 | 94 | @Autowired |
108 | 95 | private DeviceProfileEdgeProcessor deviceProfileProcessor; |
109 | 96 | |
110 | - @Lazy | |
111 | 97 | @Autowired |
112 | 98 | private DeviceEdgeProcessor deviceProcessor; |
113 | 99 | |
114 | - @Lazy | |
115 | 100 | @Autowired |
116 | 101 | private EntityEdgeProcessor entityProcessor; |
117 | 102 | |
118 | - @Lazy | |
119 | 103 | @Autowired |
120 | 104 | private AssetEdgeProcessor assetProcessor; |
121 | 105 | |
122 | - @Lazy | |
123 | 106 | @Autowired |
124 | 107 | private EntityViewEdgeProcessor entityViewProcessor; |
125 | 108 | |
126 | - @Lazy | |
127 | 109 | @Autowired |
128 | 110 | private UserEdgeProcessor userProcessor; |
129 | 111 | |
130 | - @Lazy | |
131 | 112 | @Autowired |
132 | 113 | private RelationEdgeProcessor relationProcessor; |
133 | 114 | |
134 | - @Lazy | |
135 | 115 | @Autowired |
136 | 116 | private TelemetryEdgeProcessor telemetryProcessor; |
137 | 117 | |
138 | - @Lazy | |
139 | 118 | @Autowired |
140 | 119 | private DashboardEdgeProcessor dashboardProcessor; |
141 | 120 | |
142 | - @Lazy | |
143 | 121 | @Autowired |
144 | 122 | private RuleChainEdgeProcessor ruleChainProcessor; |
145 | 123 | |
146 | - @Lazy | |
147 | 124 | @Autowired |
148 | 125 | private CustomerEdgeProcessor customerProcessor; |
149 | 126 | |
150 | - @Lazy | |
151 | 127 | @Autowired |
152 | 128 | private WidgetBundleEdgeProcessor widgetBundleProcessor; |
153 | 129 | |
154 | - @Lazy | |
155 | 130 | @Autowired |
156 | 131 | private WidgetTypeEdgeProcessor widgetTypeProcessor; |
157 | 132 | |
158 | - @Lazy | |
159 | 133 | @Autowired |
160 | 134 | private AdminSettingsEdgeProcessor adminSettingsProcessor; |
161 | 135 | |
162 | - @Lazy | |
163 | 136 | @Autowired |
164 | 137 | private EdgeEventStorageSettings edgeEventStorageSettings; |
165 | 138 | |
166 | 139 | @Autowired |
167 | - @Getter | |
168 | 140 | private DbCallbackExecutorService dbCallbackExecutor; |
169 | 141 | } | ... | ... |
... | ... | @@ -316,6 +316,7 @@ public final class EdgeGrpcSession implements Closeable { |
316 | 316 | if (ifOffset != null) { |
317 | 317 | Long newStartTs = Uuids.unixTimestamp(ifOffset); |
318 | 318 | updateQueueStartTs(newStartTs); |
319 | + log.debug("[{}] queue offset was updated [{}][{}]", this.sessionId, ifOffset, newStartTs); | |
319 | 320 | } |
320 | 321 | } |
321 | 322 | log.trace("[{}] processHandleMessages finished", this.sessionId); | ... | ... |
... | ... | @@ -5,7 +5,7 @@ |
5 | 5 | * you may not use this file except in compliance with the License. |
6 | 6 | * You may obtain a copy of the License at |
7 | 7 | * |
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | 9 | * |
10 | 10 | * Unless required by applicable law or agreed to in writing, software |
11 | 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
... | ... | @@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc.fetch; |
17 | 17 | |
18 | 18 | import com.datastax.oss.driver.api.core.uuid.Uuids; |
19 | 19 | import com.fasterxml.jackson.databind.JsonNode; |
20 | +import com.fasterxml.jackson.databind.ObjectMapper; | |
20 | 21 | import com.fasterxml.jackson.databind.node.ObjectNode; |
21 | 22 | import lombok.AllArgsConstructor; |
22 | 23 | import lombok.extern.slf4j.Slf4j; |
... | ... | @@ -47,11 +48,18 @@ import java.util.regex.Pattern; |
47 | 48 | |
48 | 49 | @AllArgsConstructor |
49 | 50 | @Slf4j |
50 | -public class AdminSettingsEdgeEventFetcher extends BasePageableEdgeEventFetcher { | |
51 | +public class AdminSettingsEdgeEventFetcher implements EdgeEventFetcher { | |
52 | + | |
53 | + private static final ObjectMapper mapper = new ObjectMapper(); | |
51 | 54 | |
52 | 55 | private final AdminSettingsService adminSettingsService; |
53 | 56 | |
54 | 57 | @Override |
58 | + public PageLink getPageLink(int pageSize) { | |
59 | + return new PageLink(pageSize); | |
60 | + } | |
61 | + | |
62 | + @Override | |
55 | 63 | public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, Edge edge, PageLink pageLink) throws Exception { |
56 | 64 | List<EdgeEvent> result = new ArrayList<>(); |
57 | 65 | ... | ... |
... | ... | @@ -28,26 +28,20 @@ import org.thingsboard.server.common.data.page.PageLink; |
28 | 28 | import org.thingsboard.server.dao.asset.AssetService; |
29 | 29 | import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; |
30 | 30 | |
31 | -import java.util.ArrayList; | |
32 | -import java.util.List; | |
33 | - | |
34 | 31 | @AllArgsConstructor |
35 | 32 | @Slf4j |
36 | -public class AssetsEdgeEventFetcher extends BasePageableEdgeEventFetcher { | |
33 | +public class AssetsEdgeEventFetcher extends BasePageableEdgeEventFetcher<Asset> { | |
37 | 34 | |
38 | 35 | private final AssetService assetService; |
39 | 36 | |
40 | 37 | @Override |
41 | - public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, Edge edge, PageLink pageLink) { | |
42 | - log.trace("[{}] start fetching edge events [{}]", tenantId, edge.getId()); | |
43 | - PageData<Asset> pageData = assetService.findAssetsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink); | |
44 | - List<EdgeEvent> result = new ArrayList<>(); | |
45 | - if (!pageData.getData().isEmpty()) { | |
46 | - for (Asset asset : pageData.getData()) { | |
47 | - result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ASSET, | |
48 | - EdgeEventActionType.ADDED, asset.getId(), null)); | |
49 | - } | |
50 | - } | |
51 | - return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext()); | |
38 | + PageData<Asset> fetchPageData(TenantId tenantId, Edge edge, PageLink pageLink) { | |
39 | + return assetService.findAssetsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink); | |
40 | + } | |
41 | + | |
42 | + @Override | |
43 | + EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, Asset asset) { | |
44 | + return EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ASSET, | |
45 | + EdgeEventActionType.ADDED, asset.getId(), null); | |
52 | 46 | } |
53 | 47 | } | ... | ... |
... | ... | @@ -16,14 +16,50 @@ |
16 | 16 | package org.thingsboard.server.service.edge.rpc.fetch; |
17 | 17 | |
18 | 18 | import com.fasterxml.jackson.databind.ObjectMapper; |
19 | +import lombok.extern.slf4j.Slf4j; | |
20 | +import org.thingsboard.server.common.data.BaseData; | |
21 | +import org.thingsboard.server.common.data.edge.Edge; | |
22 | +import org.thingsboard.server.common.data.edge.EdgeEvent; | |
23 | +import org.thingsboard.server.common.data.edge.EdgeEventActionType; | |
24 | +import org.thingsboard.server.common.data.edge.EdgeEventType; | |
25 | +import org.thingsboard.server.common.data.id.EntityId; | |
26 | +import org.thingsboard.server.common.data.id.EventId; | |
27 | +import org.thingsboard.server.common.data.id.HasId; | |
28 | +import org.thingsboard.server.common.data.id.HasUUID; | |
29 | +import org.thingsboard.server.common.data.id.IdBased; | |
30 | +import org.thingsboard.server.common.data.id.RuleChainId; | |
31 | +import org.thingsboard.server.common.data.id.TenantId; | |
32 | +import org.thingsboard.server.common.data.id.UUIDBased; | |
33 | +import org.thingsboard.server.common.data.page.PageData; | |
19 | 34 | import org.thingsboard.server.common.data.page.PageLink; |
35 | +import org.thingsboard.server.common.data.rule.RuleChain; | |
36 | +import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; | |
20 | 37 | |
21 | -public abstract class BasePageableEdgeEventFetcher implements EdgeEventFetcher { | |
38 | +import java.util.ArrayList; | |
39 | +import java.util.List; | |
22 | 40 | |
23 | - protected static final ObjectMapper mapper = new ObjectMapper(); | |
41 | +@Slf4j | |
42 | +public abstract class BasePageableEdgeEventFetcher<T> implements EdgeEventFetcher { | |
24 | 43 | |
25 | 44 | @Override |
26 | 45 | public PageLink getPageLink(int pageSize) { |
27 | 46 | return new PageLink(pageSize); |
28 | 47 | } |
48 | + | |
49 | + @Override | |
50 | + public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, Edge edge, PageLink pageLink) { | |
51 | + log.trace("[{}] start fetching edge events [{}]", tenantId, edge.getId()); | |
52 | + PageData<T> pageData = fetchPageData(tenantId, edge, pageLink); | |
53 | + List<EdgeEvent> result = new ArrayList<>(); | |
54 | + if (!pageData.getData().isEmpty()) { | |
55 | + for (T entity : pageData.getData()) { | |
56 | + result.add(constructEdgeEvent(tenantId, edge, entity)); | |
57 | + } | |
58 | + } | |
59 | + return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext()); | |
60 | + } | |
61 | + | |
62 | + abstract PageData<T> fetchPageData(TenantId tenantId, Edge edge, PageLink pageLink); | |
63 | + | |
64 | + abstract EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, T entity); | |
29 | 65 | } | ... | ... |
... | ... | @@ -28,27 +28,21 @@ import org.thingsboard.server.common.data.page.PageLink; |
28 | 28 | import org.thingsboard.server.dao.user.UserService; |
29 | 29 | import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; |
30 | 30 | |
31 | -import java.util.ArrayList; | |
32 | -import java.util.List; | |
33 | - | |
34 | 31 | @Slf4j |
35 | 32 | @AllArgsConstructor |
36 | -public abstract class BaseUsersEdgeEventFetcher extends BasePageableEdgeEventFetcher { | |
33 | +public abstract class BaseUsersEdgeEventFetcher extends BasePageableEdgeEventFetcher<User> { | |
37 | 34 | |
38 | 35 | protected final UserService userService; |
39 | 36 | |
40 | 37 | @Override |
41 | - public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, Edge edge, PageLink pageLink) { | |
42 | - log.trace("[{}] start fetching edge events [{}]", tenantId, edge.getId()); | |
43 | - PageData<User> pageData = findUsers(tenantId, pageLink); | |
44 | - List<EdgeEvent> result = new ArrayList<>(); | |
45 | - if (!pageData.getData().isEmpty()) { | |
46 | - for (User user : pageData.getData()) { | |
47 | - result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER, | |
48 | - EdgeEventActionType.ADDED, user.getId(), null)); | |
49 | - } | |
50 | - } | |
51 | - return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext()); | |
38 | + PageData<User> fetchPageData(TenantId tenantId, Edge edge, PageLink pageLink) { | |
39 | + return findUsers(tenantId, pageLink); | |
40 | + } | |
41 | + | |
42 | + @Override | |
43 | + EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, User user) { | |
44 | + return EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER, | |
45 | + EdgeEventActionType.ADDED, user.getId(), null); | |
52 | 46 | } |
53 | 47 | |
54 | 48 | protected abstract PageData<User> findUsers(TenantId tenantId, PageLink pageLink); | ... | ... |
... | ... | @@ -28,27 +28,21 @@ import org.thingsboard.server.common.data.widget.WidgetsBundle; |
28 | 28 | import org.thingsboard.server.dao.widget.WidgetsBundleService; |
29 | 29 | import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; |
30 | 30 | |
31 | -import java.util.ArrayList; | |
32 | -import java.util.List; | |
33 | - | |
34 | 31 | @Slf4j |
35 | 32 | @AllArgsConstructor |
36 | -public abstract class BaseWidgetsBundlesEdgeEventFetcher extends BasePageableEdgeEventFetcher { | |
33 | +public abstract class BaseWidgetsBundlesEdgeEventFetcher extends BasePageableEdgeEventFetcher<WidgetsBundle> { | |
37 | 34 | |
38 | 35 | protected final WidgetsBundleService widgetsBundleService; |
39 | 36 | |
40 | 37 | @Override |
41 | - public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, Edge edge, PageLink pageLink) { | |
42 | - log.trace("[{}] start fetching edge events [{}]", tenantId, edge.getId()); | |
43 | - PageData<WidgetsBundle> pageData = findWidgetsBundles(tenantId, pageLink); | |
44 | - List<EdgeEvent> result = new ArrayList<>(); | |
45 | - if (!pageData.getData().isEmpty()) { | |
46 | - for (WidgetsBundle widgetsBundle : pageData.getData()) { | |
47 | - result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGETS_BUNDLE, | |
48 | - EdgeEventActionType.ADDED, widgetsBundle.getId(), null)); | |
49 | - } | |
50 | - } | |
51 | - return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext()); | |
38 | + PageData<WidgetsBundle> fetchPageData(TenantId tenantId, Edge edge, PageLink pageLink) { | |
39 | + return findWidgetsBundles(tenantId, pageLink); | |
40 | + } | |
41 | + | |
42 | + @Override | |
43 | + EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, WidgetsBundle widgetsBundle) { | |
44 | + return EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGETS_BUNDLE, | |
45 | + EdgeEventActionType.ADDED, widgetsBundle.getId(), null); | |
52 | 46 | } |
53 | 47 | |
54 | 48 | protected abstract PageData<WidgetsBundle> findWidgetsBundles(TenantId tenantId, PageLink pageLink); | ... | ... |
... | ... | @@ -16,6 +16,8 @@ |
16 | 16 | package org.thingsboard.server.service.edge.rpc.fetch; |
17 | 17 | |
18 | 18 | import org.thingsboard.server.common.data.User; |
19 | +import org.thingsboard.server.common.data.edge.Edge; | |
20 | +import org.thingsboard.server.common.data.edge.EdgeEvent; | |
19 | 21 | import org.thingsboard.server.common.data.id.CustomerId; |
20 | 22 | import org.thingsboard.server.common.data.id.TenantId; |
21 | 23 | import org.thingsboard.server.common.data.page.PageData; |
... | ... | @@ -35,4 +37,5 @@ public class CustomerUsersEdgeEventFetcher extends BaseUsersEdgeEventFetcher { |
35 | 37 | protected PageData<User> findUsers(TenantId tenantId, PageLink pageLink) { |
36 | 38 | return userService.findCustomerUsers(tenantId, customerId, pageLink); |
37 | 39 | } |
40 | + | |
38 | 41 | } | ... | ... |
... | ... | @@ -28,26 +28,20 @@ import org.thingsboard.server.common.data.page.PageLink; |
28 | 28 | import org.thingsboard.server.dao.dashboard.DashboardService; |
29 | 29 | import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; |
30 | 30 | |
31 | -import java.util.ArrayList; | |
32 | -import java.util.List; | |
33 | - | |
34 | 31 | @AllArgsConstructor |
35 | 32 | @Slf4j |
36 | -public class DashboardsEdgeEventFetcher extends BasePageableEdgeEventFetcher { | |
33 | +public class DashboardsEdgeEventFetcher extends BasePageableEdgeEventFetcher<DashboardInfo> { | |
37 | 34 | |
38 | 35 | private final DashboardService dashboardService; |
39 | 36 | |
40 | 37 | @Override |
41 | - public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, Edge edge, PageLink pageLink) { | |
42 | - log.trace("[{}] start fetching edge events [{}]", tenantId, edge.getId()); | |
43 | - PageData<DashboardInfo> pageData = dashboardService.findDashboardsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink); | |
44 | - List<EdgeEvent> result = new ArrayList<>(); | |
45 | - if (!pageData.getData().isEmpty()) { | |
46 | - for (DashboardInfo dashboardInfo : pageData.getData()) { | |
47 | - result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.DASHBOARD, | |
48 | - EdgeEventActionType.ADDED, dashboardInfo.getId(), null)); | |
49 | - } | |
50 | - } | |
51 | - return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext()); | |
38 | + PageData<DashboardInfo> fetchPageData(TenantId tenantId, Edge edge, PageLink pageLink) { | |
39 | + return dashboardService.findDashboardsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink); | |
40 | + } | |
41 | + | |
42 | + @Override | |
43 | + EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, DashboardInfo dashboardInfo) { | |
44 | + return EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.DASHBOARD, | |
45 | + EdgeEventActionType.ADDED, dashboardInfo.getId(), null); | |
52 | 46 | } |
53 | 47 | } | ... | ... |
... | ... | @@ -28,26 +28,20 @@ import org.thingsboard.server.common.data.page.PageLink; |
28 | 28 | import org.thingsboard.server.dao.device.DeviceProfileService; |
29 | 29 | import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; |
30 | 30 | |
31 | -import java.util.ArrayList; | |
32 | -import java.util.List; | |
33 | - | |
34 | 31 | @AllArgsConstructor |
35 | 32 | @Slf4j |
36 | -public class DeviceProfilesEdgeEventFetcher extends BasePageableEdgeEventFetcher { | |
33 | +public class DeviceProfilesEdgeEventFetcher extends BasePageableEdgeEventFetcher<DeviceProfile> { | |
37 | 34 | |
38 | 35 | private final DeviceProfileService deviceProfileService; |
39 | 36 | |
40 | 37 | @Override |
41 | - public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, Edge edge, PageLink pageLink) { | |
42 | - log.trace("[{}] start fetching edge events [{}]", tenantId, edge.getId()); | |
43 | - PageData<DeviceProfile> pageData = deviceProfileService.findDeviceProfiles(tenantId, pageLink); | |
44 | - List<EdgeEvent> result = new ArrayList<>(); | |
45 | - if (!pageData.getData().isEmpty()) { | |
46 | - for (DeviceProfile deviceProfile : pageData.getData()) { | |
47 | - result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE_PROFILE, | |
48 | - EdgeEventActionType.ADDED, deviceProfile.getId(), null)); | |
49 | - } | |
50 | - } | |
51 | - return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext()); | |
38 | + PageData<DeviceProfile> fetchPageData(TenantId tenantId, Edge edge, PageLink pageLink) { | |
39 | + return deviceProfileService.findDeviceProfiles(tenantId, pageLink); | |
40 | + } | |
41 | + | |
42 | + @Override | |
43 | + EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, DeviceProfile deviceProfile) { | |
44 | + return EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE_PROFILE, | |
45 | + EdgeEventActionType.ADDED, deviceProfile.getId(), null); | |
52 | 46 | } |
53 | 47 | } | ... | ... |
... | ... | @@ -5,7 +5,7 @@ |
5 | 5 | * you may not use this file except in compliance with the License. |
6 | 6 | * You may obtain a copy of the License at |
7 | 7 | * |
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | 9 | * |
10 | 10 | * Unless required by applicable law or agreed to in writing, software |
11 | 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
... | ... | @@ -28,26 +28,20 @@ import org.thingsboard.server.common.data.rule.RuleChain; |
28 | 28 | import org.thingsboard.server.dao.rule.RuleChainService; |
29 | 29 | import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; |
30 | 30 | |
31 | -import java.util.ArrayList; | |
32 | -import java.util.List; | |
33 | - | |
34 | 31 | @Slf4j |
35 | 32 | @AllArgsConstructor |
36 | -public class RuleChainsEdgeEventFetcher extends BasePageableEdgeEventFetcher { | |
33 | +public class RuleChainsEdgeEventFetcher extends BasePageableEdgeEventFetcher<RuleChain> { | |
37 | 34 | |
38 | 35 | private final RuleChainService ruleChainService; |
39 | 36 | |
40 | 37 | @Override |
41 | - public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, Edge edge, PageLink pageLink) { | |
42 | - log.trace("[{}] start fetching edge events [{}]", tenantId, edge.getId()); | |
43 | - PageData<RuleChain> pageData = ruleChainService.findRuleChainsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink); | |
44 | - List<EdgeEvent> result = new ArrayList<>(); | |
45 | - if (!pageData.getData().isEmpty()) { | |
46 | - for (RuleChain ruleChain : pageData.getData()) { | |
47 | - result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN, | |
48 | - EdgeEventActionType.ADDED, ruleChain.getId(), null)); | |
49 | - } | |
50 | - } | |
51 | - return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext()); | |
38 | + PageData<RuleChain> fetchPageData(TenantId tenantId, Edge edge, PageLink pageLink) { | |
39 | + return ruleChainService.findRuleChainsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink); | |
40 | + } | |
41 | + | |
42 | + @Override | |
43 | + EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, RuleChain ruleChain) { | |
44 | + return EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN, | |
45 | + EdgeEventActionType.ADDED, ruleChain.getId(), null); | |
52 | 46 | } |
53 | 47 | } | ... | ... |
... | ... | @@ -16,6 +16,8 @@ |
16 | 16 | package org.thingsboard.server.service.edge.rpc.fetch; |
17 | 17 | |
18 | 18 | import lombok.extern.slf4j.Slf4j; |
19 | +import org.thingsboard.server.common.data.edge.Edge; | |
20 | +import org.thingsboard.server.common.data.edge.EdgeEvent; | |
19 | 21 | import org.thingsboard.server.common.data.id.TenantId; |
20 | 22 | import org.thingsboard.server.common.data.page.PageData; |
21 | 23 | import org.thingsboard.server.common.data.page.PageLink; |
... | ... | @@ -23,7 +25,7 @@ import org.thingsboard.server.common.data.widget.WidgetsBundle; |
23 | 25 | import org.thingsboard.server.dao.widget.WidgetsBundleService; |
24 | 26 | |
25 | 27 | @Slf4j |
26 | -public class TenantWidgetsBundlesEdgeEventFetcher extends BaseWidgetsBundlesEdgeEventFetcher implements EdgeEventFetcher { | |
28 | +public class TenantWidgetsBundlesEdgeEventFetcher extends BaseWidgetsBundlesEdgeEventFetcher { | |
27 | 29 | |
28 | 30 | public TenantWidgetsBundlesEdgeEventFetcher(WidgetsBundleService widgetsBundleService) { |
29 | 31 | super(widgetsBundleService); | ... | ... |
common/data/src/main/java/org/thingsboard/server/common/data/page/BasePageDataIterable.java
0 → 100644
1 | +package org.thingsboard.server.common.data.page; | |
2 | + | |
3 | +import java.util.Iterator; | |
4 | +import java.util.List; | |
5 | +import java.util.NoSuchElementException; | |
6 | + | |
7 | +public abstract class BasePageDataIterable<T> implements Iterable<T>, Iterator<T> { | |
8 | + | |
9 | + private final int fetchSize; | |
10 | + | |
11 | + private List<T> currentItems; | |
12 | + private int currentIdx; | |
13 | + private boolean hasNextPack; | |
14 | + private PageLink nextPackLink; | |
15 | + private boolean initialized; | |
16 | + | |
17 | + public BasePageDataIterable(int fetchSize) { | |
18 | + super(); | |
19 | + this.fetchSize = fetchSize; | |
20 | + } | |
21 | + | |
22 | + @Override | |
23 | + public Iterator<T> iterator() { | |
24 | + return this; | |
25 | + } | |
26 | + | |
27 | + @Override | |
28 | + public boolean hasNext() { | |
29 | + if (!initialized) { | |
30 | + fetch(new PageLink(fetchSize)); | |
31 | + initialized = true; | |
32 | + } | |
33 | + if (currentIdx == currentItems.size()) { | |
34 | + if (hasNextPack) { | |
35 | + fetch(nextPackLink); | |
36 | + } | |
37 | + } | |
38 | + return currentIdx < currentItems.size(); | |
39 | + } | |
40 | + | |
41 | + @Override | |
42 | + public T next() { | |
43 | + if (!hasNext()) { | |
44 | + throw new NoSuchElementException(); | |
45 | + } | |
46 | + return currentItems.get(currentIdx++); | |
47 | + } | |
48 | + | |
49 | + private void fetch(PageLink link) { | |
50 | + PageData<T> pageData = fetchPageData(link); | |
51 | + currentIdx = 0; | |
52 | + currentItems = pageData.getData(); | |
53 | + hasNextPack = pageData.hasNext(); | |
54 | + nextPackLink = link.nextPageLink(); | |
55 | + } | |
56 | + | |
57 | + abstract PageData<T> fetchPageData(PageLink link); | |
58 | +} | ... | ... |
... | ... | @@ -5,7 +5,7 @@ |
5 | 5 | * you may not use this file except in compliance with the License. |
6 | 6 | * You may obtain a copy of the License at |
7 | 7 | * |
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | 9 | * |
10 | 10 | * Unless required by applicable law or agreed to in writing, software |
11 | 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
... | ... | @@ -15,70 +15,21 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.common.data.page; |
17 | 17 | |
18 | -import java.util.Iterator; | |
19 | -import java.util.List; | |
20 | -import java.util.NoSuchElementException; | |
21 | - | |
22 | -import org.thingsboard.server.common.data.BaseData; | |
23 | -import org.thingsboard.server.common.data.SearchTextBased; | |
24 | -import org.thingsboard.server.common.data.id.EntityId; | |
25 | -import org.thingsboard.server.common.data.id.UUIDBased; | |
26 | - | |
27 | -public class PageDataIterable<T> implements Iterable<T>, Iterator<T> { | |
18 | +public class PageDataIterable<T> extends BasePageDataIterable<T> { | |
28 | 19 | |
29 | 20 | private final FetchFunction<T> function; |
30 | - private final int fetchSize; | |
31 | - | |
32 | - private List<T> currentItems; | |
33 | - private int currentIdx; | |
34 | - private boolean hasNextPack; | |
35 | - private PageLink nextPackLink; | |
36 | - private boolean initialized; | |
37 | 21 | |
38 | 22 | public PageDataIterable(FetchFunction<T> function, int fetchSize) { |
39 | - super(); | |
23 | + super(fetchSize); | |
40 | 24 | this.function = function; |
41 | - this.fetchSize = fetchSize; | |
42 | 25 | } |
43 | 26 | |
44 | 27 | @Override |
45 | - public Iterator<T> iterator() { | |
46 | - return this; | |
28 | + PageData<T> fetchPageData(PageLink link) { | |
29 | + return function.fetch(link); | |
47 | 30 | } |
48 | 31 | |
49 | - @Override | |
50 | - public boolean hasNext() { | |
51 | - if(!initialized){ | |
52 | - fetch(new PageLink(fetchSize)); | |
53 | - initialized = true; | |
54 | - } | |
55 | - if(currentIdx == currentItems.size()){ | |
56 | - if(hasNextPack){ | |
57 | - fetch(nextPackLink); | |
58 | - } | |
59 | - } | |
60 | - return currentIdx < currentItems.size(); | |
61 | - } | |
62 | - | |
63 | - private void fetch(PageLink link) { | |
64 | - PageData<T> pageData = function.fetch(link); | |
65 | - currentIdx = 0; | |
66 | - currentItems = pageData.getData(); | |
67 | - hasNextPack = pageData.hasNext(); | |
68 | - nextPackLink = link.nextPageLink(); | |
69 | - } | |
70 | - | |
71 | - @Override | |
72 | - public T next() { | |
73 | - if(!hasNext()){ | |
74 | - throw new NoSuchElementException(); | |
75 | - } | |
76 | - return currentItems.get(currentIdx++); | |
77 | - } | |
78 | - | |
79 | - public static interface FetchFunction<T> { | |
80 | - | |
32 | + public interface FetchFunction<T> { | |
81 | 33 | PageData<T> fetch(PageLink link); |
82 | - | |
83 | 34 | } |
84 | 35 | } | ... | ... |
common/data/src/main/java/org/thingsboard/server/common/data/page/PageDataIterableByTenant.java
0 → 100644
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.data.page; | |
17 | + | |
18 | +import org.thingsboard.server.common.data.id.TenantId; | |
19 | + | |
20 | +public class PageDataIterableByTenant<T> extends BasePageDataIterable<T> { | |
21 | + | |
22 | + private final FetchFunction<T> function; | |
23 | + private final TenantId tenantId; | |
24 | + | |
25 | + public PageDataIterableByTenant(FetchFunction<T> function, TenantId tenantId, int fetchSize) { | |
26 | + super(fetchSize); | |
27 | + this.function = function; | |
28 | + this.tenantId = tenantId; | |
29 | + } | |
30 | + | |
31 | + @Override | |
32 | + PageData<T> fetchPageData(PageLink link) { | |
33 | + return function.fetch(tenantId, link); | |
34 | + } | |
35 | + | |
36 | + public interface FetchFunction<T> { | |
37 | + PageData<T> fetch(TenantId tenantId, PageLink link); | |
38 | + } | |
39 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.data.page; | |
17 | + | |
18 | +import org.thingsboard.server.common.data.id.EntityId; | |
19 | +import org.thingsboard.server.common.data.id.TenantId; | |
20 | + | |
21 | +public class PageDataIterableByTenantIdEntityId<T> extends BasePageDataIterable<T> { | |
22 | + | |
23 | + private final FetchFunction<T> function; | |
24 | + private final TenantId tenantId; | |
25 | + private final EntityId entityId; | |
26 | + | |
27 | + public PageDataIterableByTenantIdEntityId(FetchFunction<T> function, TenantId tenantId, EntityId entityId, int fetchSize) { | |
28 | + super(fetchSize); | |
29 | + this.function = function; | |
30 | + this.tenantId = tenantId; | |
31 | + this.entityId = entityId; | |
32 | + | |
33 | + } | |
34 | + | |
35 | + @Override | |
36 | + PageData<T> fetchPageData(PageLink link) { | |
37 | + return function.fetch(tenantId, entityId, link); | |
38 | + } | |
39 | + | |
40 | + public interface FetchFunction<T> { | |
41 | + PageData<T> fetch(TenantId tenantId, EntityId entityId, PageLink link); | |
42 | + } | |
43 | +} | ... | ... |