Showing
8 changed files
with
133 additions
and
348 deletions
... | ... | @@ -306,88 +306,4 @@ public class UserController extends BaseController { |
306 | 306 | throw handleException(e); |
307 | 307 | } |
308 | 308 | } |
309 | - | |
310 | - @PreAuthorize("hasAuthority('TENANT_ADMIN')") | |
311 | - @RequestMapping(value = "/edge/{edgeId}/user/{userId}", method = RequestMethod.POST) | |
312 | - @ResponseBody | |
313 | - public User assignUserToEdge(@PathVariable(EDGE_ID) String strEdgeId, | |
314 | - @PathVariable(USER_ID) String strUserId) throws ThingsboardException { | |
315 | - checkParameter(EDGE_ID, strEdgeId); | |
316 | - checkParameter(USER_ID, strUserId); | |
317 | - try { | |
318 | - EdgeId edgeId = new EdgeId(toUUID(strEdgeId)); | |
319 | - Edge edge = checkEdgeId(edgeId, Operation.READ); | |
320 | - | |
321 | - UserId userId = new UserId(toUUID(strUserId)); | |
322 | - checkUserId(userId, Operation.ASSIGN_TO_EDGE); | |
323 | - | |
324 | - User savedUser = checkNotNull(userService.assignUserToEdge(getTenantId(), userId, edgeId)); | |
325 | - | |
326 | - logEntityAction(userId, savedUser, | |
327 | - savedUser.getCustomerId(), | |
328 | - ActionType.ASSIGNED_TO_EDGE, null, strUserId, strEdgeId, edge.getName()); | |
329 | - | |
330 | - return savedUser; | |
331 | - } catch (Exception e) { | |
332 | - | |
333 | - logEntityAction(emptyId(EntityType.USER), null, | |
334 | - null, | |
335 | - ActionType.ASSIGNED_TO_EDGE, e, strUserId, strEdgeId); | |
336 | - | |
337 | - throw handleException(e); | |
338 | - } | |
339 | - } | |
340 | - | |
341 | - @PreAuthorize("hasAuthority('TENANT_ADMIN')") | |
342 | - @RequestMapping(value = "/edge/{edgeId}/user/{userId}", method = RequestMethod.DELETE) | |
343 | - @ResponseBody | |
344 | - public User unassignUserFromEdge(@PathVariable(EDGE_ID) String strEdgeId, | |
345 | - @PathVariable(USER_ID) String strUserId) throws ThingsboardException { | |
346 | - checkParameter(EDGE_ID, strEdgeId); | |
347 | - checkParameter(USER_ID, strUserId); | |
348 | - try { | |
349 | - EdgeId edgeId = new EdgeId(toUUID(strEdgeId)); | |
350 | - Edge edge = checkEdgeId(edgeId, Operation.READ); | |
351 | - | |
352 | - UserId userId = new UserId(toUUID(strUserId)); | |
353 | - User user = checkUserId(userId, Operation.UNASSIGN_FROM_EDGE); | |
354 | - | |
355 | - User savedUser = checkNotNull(userService.unassignUserFromEdge(getTenantId(), userId, edgeId)); | |
356 | - | |
357 | - logEntityAction(userId, savedUser, | |
358 | - savedUser.getCustomerId(), | |
359 | - ActionType.UNASSIGNED_FROM_EDGE, null, strUserId, edge.getId().toString(), edge.getName()); | |
360 | - | |
361 | - return savedUser; | |
362 | - } catch (Exception e) { | |
363 | - | |
364 | - logEntityAction(emptyId(EntityType.USER), null, | |
365 | - null, | |
366 | - ActionType.UNASSIGNED_FROM_EDGE, e, strUserId); | |
367 | - | |
368 | - throw handleException(e); | |
369 | - } | |
370 | - } | |
371 | - | |
372 | - @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") | |
373 | - @RequestMapping(value = "/edge/{edgeId}/users", params = {"limit"}, method = RequestMethod.GET) | |
374 | - @ResponseBody | |
375 | - public TimePageData<User> getEdgeUsers( | |
376 | - @PathVariable(EDGE_ID) String strEdgeId, | |
377 | - @RequestParam int limit, | |
378 | - @RequestParam(required = false) Long startTime, | |
379 | - @RequestParam(required = false) Long endTime, | |
380 | - @RequestParam(required = false, defaultValue = "false") boolean ascOrder, | |
381 | - @RequestParam(required = false) String offset) throws ThingsboardException { | |
382 | - checkParameter(EDGE_ID, strEdgeId); | |
383 | - try { | |
384 | - TenantId tenantId = getCurrentUser().getTenantId(); | |
385 | - EdgeId edgeId = new EdgeId(toUUID(strEdgeId)); | |
386 | - checkEdgeId(edgeId, Operation.READ); | |
387 | - TimePageLink pageLink = createPageLink(limit, startTime, endTime, ascOrder, offset); | |
388 | - return checkNotNull(userService.findUsersByTenantIdAndEdgeId(tenantId, edgeId, pageLink).get()); | |
389 | - } catch (Exception e) { | |
390 | - throw handleException(e); | |
391 | - } | |
392 | - } | |
393 | 309 | } | ... | ... |
... | ... | @@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc.init; |
17 | 17 | |
18 | 18 | import com.google.common.util.concurrent.Futures; |
19 | 19 | import com.google.common.util.concurrent.ListenableFuture; |
20 | +import com.google.common.util.concurrent.MoreExecutors; | |
20 | 21 | import io.grpc.stub.StreamObserver; |
21 | 22 | import lombok.extern.slf4j.Slf4j; |
22 | 23 | import org.springframework.beans.factory.annotation.Autowired; |
... | ... | @@ -30,6 +31,8 @@ import org.thingsboard.server.common.data.asset.Asset; |
30 | 31 | import org.thingsboard.server.common.data.edge.Edge; |
31 | 32 | import org.thingsboard.server.common.data.id.EntityId; |
32 | 33 | import org.thingsboard.server.common.data.id.RuleChainId; |
34 | +import org.thingsboard.server.common.data.page.TextPageData; | |
35 | +import org.thingsboard.server.common.data.page.TextPageLink; | |
33 | 36 | import org.thingsboard.server.common.data.page.TimePageData; |
34 | 37 | import org.thingsboard.server.common.data.page.TimePageLink; |
35 | 38 | import org.thingsboard.server.common.data.relation.EntityRelation; |
... | ... | @@ -121,53 +124,57 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
121 | 124 | @Override |
122 | 125 | public void sync(EdgeContextComponent ctx, Edge edge, StreamObserver<ResponseMsg> outputStream) { |
123 | 126 | Set<EntityId> pushedEntityIds = new HashSet<>(); |
124 | - syncRuleChains(edge, pushedEntityIds, outputStream); | |
125 | - syncDevices(edge, pushedEntityIds, outputStream); | |
126 | - syncAssets(edge, pushedEntityIds, outputStream); | |
127 | - syncEntityViews(edge, pushedEntityIds, outputStream); | |
128 | - syncDashboards(edge, pushedEntityIds, outputStream); | |
129 | 127 | syncUsers(ctx, edge, pushedEntityIds, outputStream); |
130 | - syncRelations(ctx, edge, pushedEntityIds, outputStream); | |
128 | + List<ListenableFuture<Void>> futures = new ArrayList<>(); | |
129 | + futures.add(syncRuleChains(ctx, edge, pushedEntityIds, outputStream)); | |
130 | + futures.add(syncDevices(ctx, edge, pushedEntityIds, outputStream)); | |
131 | + futures.add(syncAssets(ctx, edge, pushedEntityIds, outputStream)); | |
132 | + futures.add(syncEntityViews(ctx, edge, pushedEntityIds, outputStream)); | |
133 | + futures.add(syncDashboards(ctx, edge, pushedEntityIds, outputStream)); | |
134 | + ListenableFuture<List<Void>> joinFuture = Futures.allAsList(futures); | |
135 | + Futures.transform(joinFuture, result -> { | |
136 | + syncRelations(ctx, edge, pushedEntityIds, outputStream); | |
137 | + return null; | |
138 | + }, MoreExecutors.directExecutor()); | |
131 | 139 | } |
132 | 140 | |
133 | - private void syncRuleChains(Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { | |
141 | + private ListenableFuture<Void> syncRuleChains(EdgeContextComponent ctx, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { | |
134 | 142 | try { |
135 | - TimePageLink pageLink = new TimePageLink(100); | |
136 | - TimePageData<RuleChain> pageData; | |
137 | - do { | |
138 | - pageData = ruleChainService.findRuleChainsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink).get(); | |
139 | - if (!pageData.getData().isEmpty()) { | |
140 | - log.trace("[{}] [{}] rule chains(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); | |
141 | - for (RuleChain ruleChain : pageData.getData()) { | |
142 | - RuleChainUpdateMsg ruleChainUpdateMsg = | |
143 | - ruleChainUpdateMsgConstructor.constructRuleChainUpdatedMsg( | |
144 | - edge.getRootRuleChainId(), | |
145 | - UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, | |
146 | - ruleChain); | |
147 | - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() | |
148 | - .setRuleChainUpdateMsg(ruleChainUpdateMsg) | |
149 | - .build(); | |
150 | - outputStream.onNext(ResponseMsg.newBuilder() | |
151 | - .setEntityUpdateMsg(entityUpdateMsg) | |
152 | - .build()); | |
153 | - pushedEntityIds.add(ruleChain.getId()); | |
143 | + ListenableFuture<TimePageData<RuleChain>> future = ruleChainService.findRuleChainsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), new TimePageLink(Integer.MAX_VALUE)); | |
144 | + return Futures.transform(future, pageData -> { | |
145 | + try { | |
146 | + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { | |
147 | + log.trace("[{}] [{}] rule chains(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); | |
148 | + for (RuleChain ruleChain : pageData.getData()) { | |
149 | + RuleChainUpdateMsg ruleChainUpdateMsg = | |
150 | + ruleChainUpdateMsgConstructor.constructRuleChainUpdatedMsg( | |
151 | + edge.getRootRuleChainId(), | |
152 | + UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, | |
153 | + ruleChain); | |
154 | + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() | |
155 | + .setRuleChainUpdateMsg(ruleChainUpdateMsg) | |
156 | + .build(); | |
157 | + outputStream.onNext(ResponseMsg.newBuilder() | |
158 | + .setEntityUpdateMsg(entityUpdateMsg) | |
159 | + .build()); | |
160 | + pushedEntityIds.add(ruleChain.getId()); | |
161 | + } | |
154 | 162 | } |
163 | + } catch (Exception e) { | |
164 | + log.error("Exception during loading edge rule chain(s) on sync!", e); | |
155 | 165 | } |
156 | - if (pageData.hasNext()) { | |
157 | - pageLink = pageData.getNextPageLink(); | |
158 | - } | |
159 | - } while (pageData.hasNext()); | |
166 | + return null; | |
167 | + }, ctx.getDbCallbackExecutor()); | |
160 | 168 | } catch (Exception e) { |
161 | 169 | log.error("Exception during loading edge rule chain(s) on sync!", e); |
170 | + return Futures.immediateFuture(null); | |
162 | 171 | } |
163 | 172 | } |
164 | 173 | |
165 | - private void syncDevices(Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { | |
174 | + private ListenableFuture<Void> syncDevices(EdgeContextComponent ctx, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { | |
166 | 175 | try { |
167 | - TimePageLink pageLink = new TimePageLink(100); | |
168 | - TimePageData<Device> pageData; | |
169 | - do { | |
170 | - pageData = deviceService.findDevicesByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink).get(); | |
176 | + ListenableFuture<TimePageData<Device>> future = deviceService.findDevicesByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), new TimePageLink(Integer.MAX_VALUE)); | |
177 | + return Futures.transform(future, pageData -> { | |
171 | 178 | if (!pageData.getData().isEmpty()) { |
172 | 179 | log.trace("[{}] [{}] device(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); |
173 | 180 | for (Device device : pageData.getData()) { |
... | ... | @@ -184,22 +191,19 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
184 | 191 | pushedEntityIds.add(device.getId()); |
185 | 192 | } |
186 | 193 | } |
187 | - if (pageData.hasNext()) { | |
188 | - pageLink = pageData.getNextPageLink(); | |
189 | - } | |
190 | - } while (pageData.hasNext()); | |
194 | + return null; | |
195 | + }, ctx.getDbCallbackExecutor()); | |
191 | 196 | } catch (Exception e) { |
192 | 197 | log.error("Exception during loading edge device(s) on sync!", e); |
198 | + return Futures.immediateFuture(null); | |
193 | 199 | } |
194 | 200 | } |
195 | 201 | |
196 | - private void syncAssets(Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { | |
202 | + private ListenableFuture<Void> syncAssets(EdgeContextComponent ctx, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { | |
197 | 203 | try { |
198 | - TimePageLink pageLink = new TimePageLink(100); | |
199 | - TimePageData<Asset> pageData; | |
200 | - do { | |
201 | - pageData = assetService.findAssetsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink).get(); | |
202 | - if (!pageData.getData().isEmpty()) { | |
204 | + ListenableFuture<TimePageData<Asset>> future = assetService.findAssetsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), new TimePageLink(Integer.MAX_VALUE)); | |
205 | + return Futures.transform(future, pageData -> { | |
206 | + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { | |
203 | 207 | log.trace("[{}] [{}] asset(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); |
204 | 208 | for (Asset asset : pageData.getData()) { |
205 | 209 | AssetUpdateMsg assetUpdateMsg = |
... | ... | @@ -215,110 +219,112 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
215 | 219 | pushedEntityIds.add(asset.getId()); |
216 | 220 | } |
217 | 221 | } |
218 | - if (pageData.hasNext()) { | |
219 | - pageLink = pageData.getNextPageLink(); | |
220 | - } | |
221 | - } while (pageData.hasNext()); | |
222 | + return null; | |
223 | + }, ctx.getDbCallbackExecutor()); | |
222 | 224 | } catch (Exception e) { |
223 | 225 | log.error("Exception during loading edge asset(s) on sync!", e); |
226 | + return Futures.immediateFuture(null); | |
224 | 227 | } |
225 | 228 | } |
226 | 229 | |
227 | - private void syncEntityViews(Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { | |
230 | + private ListenableFuture<Void> syncEntityViews(EdgeContextComponent ctx, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { | |
228 | 231 | try { |
229 | - TimePageLink pageLink = new TimePageLink(100); | |
230 | - TimePageData<EntityView> pageData; | |
231 | - do { | |
232 | - pageData = entityViewService.findEntityViewsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink).get(); | |
233 | - if (!pageData.getData().isEmpty()) { | |
234 | - log.trace("[{}] [{}] entity view(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); | |
235 | - for (EntityView entityView : pageData.getData()) { | |
236 | - EntityViewUpdateMsg entityViewUpdateMsg = | |
237 | - entityViewUpdateMsgConstructor.constructEntityViewUpdatedMsg( | |
238 | - UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, | |
239 | - entityView); | |
240 | - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() | |
241 | - .setEntityViewUpdateMsg(entityViewUpdateMsg) | |
242 | - .build(); | |
243 | - outputStream.onNext(ResponseMsg.newBuilder() | |
244 | - .setEntityUpdateMsg(entityUpdateMsg) | |
245 | - .build()); | |
246 | - pushedEntityIds.add(entityView.getId()); | |
232 | + ListenableFuture<TimePageData<EntityView>> future = entityViewService.findEntityViewsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), new TimePageLink(Integer.MAX_VALUE)); | |
233 | + return Futures.transform(future, pageData -> { | |
234 | + try { | |
235 | + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { | |
236 | + log.trace("[{}] [{}] entity view(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); | |
237 | + for (EntityView entityView : pageData.getData()) { | |
238 | + EntityViewUpdateMsg entityViewUpdateMsg = | |
239 | + entityViewUpdateMsgConstructor.constructEntityViewUpdatedMsg( | |
240 | + UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, | |
241 | + entityView); | |
242 | + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() | |
243 | + .setEntityViewUpdateMsg(entityViewUpdateMsg) | |
244 | + .build(); | |
245 | + outputStream.onNext(ResponseMsg.newBuilder() | |
246 | + .setEntityUpdateMsg(entityUpdateMsg) | |
247 | + .build()); | |
248 | + pushedEntityIds.add(entityView.getId()); | |
249 | + } | |
247 | 250 | } |
251 | + } catch (Exception e) { | |
252 | + log.error("Exception during loading edge entity view(s) on sync!", e); | |
248 | 253 | } |
249 | - if (pageData.hasNext()) { | |
250 | - pageLink = pageData.getNextPageLink(); | |
251 | - } | |
252 | - } while (pageData.hasNext()); | |
254 | + return null; | |
255 | + }, ctx.getDbCallbackExecutor()); | |
253 | 256 | } catch (Exception e) { |
254 | 257 | log.error("Exception during loading edge entity view(s) on sync!", e); |
258 | + return Futures.immediateFuture(null); | |
255 | 259 | } |
256 | 260 | } |
257 | 261 | |
258 | - private void syncDashboards(Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { | |
262 | + private ListenableFuture<Void> syncDashboards(EdgeContextComponent ctx, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { | |
259 | 263 | try { |
260 | - TimePageLink pageLink = new TimePageLink(100); | |
261 | - TimePageData<DashboardInfo> pageData; | |
262 | - do { | |
263 | - pageData = dashboardService.findDashboardsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink).get(); | |
264 | - if (!pageData.getData().isEmpty()) { | |
265 | - log.trace("[{}] [{}] dashboard(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); | |
266 | - for (DashboardInfo dashboardInfo : pageData.getData()) { | |
267 | - Dashboard dashboard = dashboardService.findDashboardById(edge.getTenantId(), dashboardInfo.getId()); | |
268 | - DashboardUpdateMsg dashboardUpdateMsg = | |
269 | - dashboardUpdateMsgConstructor.constructDashboardUpdatedMsg( | |
270 | - UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, | |
271 | - dashboard); | |
272 | - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() | |
273 | - .setDashboardUpdateMsg(dashboardUpdateMsg) | |
274 | - .build(); | |
275 | - outputStream.onNext(ResponseMsg.newBuilder() | |
276 | - .setEntityUpdateMsg(entityUpdateMsg) | |
277 | - .build()); | |
278 | - pushedEntityIds.add(dashboard.getId()); | |
264 | + ListenableFuture<TimePageData<DashboardInfo>> future = dashboardService.findDashboardsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), new TimePageLink(Integer.MAX_VALUE)); | |
265 | + return Futures.transform(future, pageData -> { | |
266 | + try { | |
267 | + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { | |
268 | + log.trace("[{}] [{}] dashboard(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); | |
269 | + for (DashboardInfo dashboardInfo : pageData.getData()) { | |
270 | + Dashboard dashboard = dashboardService.findDashboardById(edge.getTenantId(), dashboardInfo.getId()); | |
271 | + DashboardUpdateMsg dashboardUpdateMsg = | |
272 | + dashboardUpdateMsgConstructor.constructDashboardUpdatedMsg( | |
273 | + UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, | |
274 | + dashboard); | |
275 | + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() | |
276 | + .setDashboardUpdateMsg(dashboardUpdateMsg) | |
277 | + .build(); | |
278 | + outputStream.onNext(ResponseMsg.newBuilder() | |
279 | + .setEntityUpdateMsg(entityUpdateMsg) | |
280 | + .build()); | |
281 | + pushedEntityIds.add(dashboard.getId()); | |
282 | + } | |
279 | 283 | } |
284 | + } catch (Exception e) { | |
285 | + log.error("Exception during loading edge dashboard(s) on sync!", e); | |
280 | 286 | } |
281 | - if (pageData.hasNext()) { | |
282 | - pageLink = pageData.getNextPageLink(); | |
283 | - } | |
284 | - } while (pageData.hasNext()); | |
287 | + return null; | |
288 | + }, ctx.getDbCallbackExecutor()); | |
285 | 289 | } catch (Exception e) { |
286 | 290 | log.error("Exception during loading edge dashboard(s) on sync!", e); |
291 | + return Futures.immediateFuture(null); | |
287 | 292 | } |
288 | 293 | } |
289 | 294 | |
290 | 295 | private void syncUsers(EdgeContextComponent ctx, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { |
291 | 296 | try { |
292 | - TimePageLink pageLink = new TimePageLink(100); | |
293 | - TimePageData<User> pageData; | |
294 | - do { | |
295 | - pageData = userService.findUsersByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink).get(); | |
296 | - if (!pageData.getData().isEmpty()) { | |
297 | - log.trace("[{}] [{}] user(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); | |
298 | - for (User user : pageData.getData()) { | |
299 | - UserUpdateMsg userUpdateMsg = | |
300 | - userUpdateMsgConstructor.constructUserUpdatedMsg( | |
301 | - UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, | |
302 | - user); | |
303 | - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() | |
304 | - .setUserUpdateMsg(userUpdateMsg) | |
305 | - .build(); | |
306 | - outputStream.onNext(ResponseMsg.newBuilder() | |
307 | - .setEntityUpdateMsg(entityUpdateMsg) | |
308 | - .build()); | |
309 | - pushedEntityIds.add(user.getId()); | |
310 | - } | |
311 | - } | |
312 | - if (pageData.hasNext()) { | |
313 | - pageLink = pageData.getNextPageLink(); | |
314 | - } | |
315 | - } while (pageData.hasNext()); | |
297 | + TextPageData<User> pageData = userService.findTenantAdmins(edge.getTenantId(), new TextPageLink(Integer.MAX_VALUE)); | |
298 | + pushUsersToEdge(pageData, edge, pushedEntityIds, outputStream); | |
299 | + if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) { | |
300 | + pageData = userService.findCustomerUsers(edge.getTenantId(), edge.getCustomerId(), new TextPageLink(Integer.MAX_VALUE)); | |
301 | + pushUsersToEdge(pageData, edge, pushedEntityIds, outputStream); | |
302 | + } | |
316 | 303 | } catch (Exception e) { |
317 | 304 | log.error("Exception during loading edge user(s) on sync!", e); |
318 | 305 | } |
319 | 306 | } |
320 | 307 | |
321 | - private void syncRelations(EdgeContextComponent ctx, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { | |
308 | + private void pushUsersToEdge(TextPageData<User> pageData, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { | |
309 | + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { | |
310 | + log.trace("[{}] [{}] user(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); | |
311 | + for (User user : pageData.getData()) { | |
312 | + UserUpdateMsg userUpdateMsg = | |
313 | + userUpdateMsgConstructor.constructUserUpdatedMsg( | |
314 | + UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, | |
315 | + user); | |
316 | + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() | |
317 | + .setUserUpdateMsg(userUpdateMsg) | |
318 | + .build(); | |
319 | + outputStream.onNext(ResponseMsg.newBuilder() | |
320 | + .setEntityUpdateMsg(entityUpdateMsg) | |
321 | + .build()); | |
322 | + pushedEntityIds.add(user.getId()); | |
323 | + } | |
324 | + } | |
325 | + } | |
326 | + | |
327 | + private ListenableFuture<Void> syncRelations(EdgeContextComponent ctx, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { | |
322 | 328 | if (!pushedEntityIds.isEmpty()) { |
323 | 329 | List<ListenableFuture<List<EntityRelation>>> futures = new ArrayList<>(); |
324 | 330 | for (EntityId entityId : pushedEntityIds) { |
... | ... | @@ -326,7 +332,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
326 | 332 | futures.add(syncRelations(edge, entityId, EntitySearchDirection.TO)); |
327 | 333 | } |
328 | 334 | ListenableFuture<List<List<EntityRelation>>> relationsListFuture = Futures.allAsList(futures); |
329 | - Futures.transform(relationsListFuture, relationsList -> { | |
335 | + return Futures.transform(relationsListFuture, relationsList -> { | |
330 | 336 | try { |
331 | 337 | Set<EntityRelation> uniqueEntityRelations = new HashSet<>(); |
332 | 338 | if (!relationsList.isEmpty()) { |
... | ... | @@ -360,6 +366,8 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
360 | 366 | } |
361 | 367 | return null; |
362 | 368 | }, ctx.getDbCallbackExecutor()); |
369 | + } else { | |
370 | + return Futures.immediateFuture(null); | |
363 | 371 | } |
364 | 372 | } |
365 | 373 | |
... | ... | @@ -369,7 +377,6 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
369 | 377 | return relationService.findByQuery(edge.getTenantId(), query); |
370 | 378 | } |
371 | 379 | |
372 | - | |
373 | 380 | @Override |
374 | 381 | public void syncRuleChainMetadata(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg, StreamObserver<ResponseMsg> outputStream) { |
375 | 382 | if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) { | ... | ... |
... | ... | @@ -18,14 +18,11 @@ package org.thingsboard.server.dao.user; |
18 | 18 | import com.google.common.util.concurrent.ListenableFuture; |
19 | 19 | import org.thingsboard.server.common.data.User; |
20 | 20 | import org.thingsboard.server.common.data.id.CustomerId; |
21 | -import org.thingsboard.server.common.data.id.EdgeId; | |
22 | 21 | import org.thingsboard.server.common.data.id.TenantId; |
23 | 22 | import org.thingsboard.server.common.data.id.UserCredentialsId; |
24 | 23 | import org.thingsboard.server.common.data.id.UserId; |
25 | 24 | import org.thingsboard.server.common.data.page.TextPageData; |
26 | 25 | import org.thingsboard.server.common.data.page.TextPageLink; |
27 | -import org.thingsboard.server.common.data.page.TimePageData; | |
28 | -import org.thingsboard.server.common.data.page.TimePageLink; | |
29 | 26 | import org.thingsboard.server.common.data.security.UserCredentials; |
30 | 27 | |
31 | 28 | public interface UserService { |
... | ... | @@ -69,10 +66,4 @@ public interface UserService { |
69 | 66 | void onUserLoginSuccessful(TenantId tenantId, UserId userId); |
70 | 67 | |
71 | 68 | int onUserLoginIncorrectCredentials(TenantId tenantId, UserId userId); |
72 | - | |
73 | - User assignUserToEdge(TenantId tenantId, UserId userId, EdgeId edgeId); | |
74 | - | |
75 | - User unassignUserFromEdge(TenantId tenantId, UserId userId, EdgeId edgeId); | |
76 | - | |
77 | - ListenableFuture<TimePageData<User>> findUsersByTenantIdAndEdgeId(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink); | |
78 | 69 | } | ... | ... |
... | ... | @@ -5,7 +5,7 @@ |
5 | 5 | * you may not use this file except in compliance with the License. |
6 | 6 | * You may obtain a copy of the License at |
7 | 7 | * |
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | 9 | * |
10 | 10 | * Unless required by applicable law or agreed to in writing, software |
11 | 11 | * distributed under the License is distributed on an "AS IS" BASIS, | ... | ... |
... | ... | @@ -15,31 +15,21 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.dao.sql.user; |
17 | 17 | |
18 | -import com.google.common.util.concurrent.Futures; | |
19 | -import com.google.common.util.concurrent.ListenableFuture; | |
20 | -import com.google.common.util.concurrent.MoreExecutors; | |
21 | 18 | import lombok.extern.slf4j.Slf4j; |
22 | 19 | import org.springframework.beans.factory.annotation.Autowired; |
23 | 20 | import org.springframework.data.domain.PageRequest; |
24 | 21 | import org.springframework.data.repository.CrudRepository; |
25 | 22 | import org.springframework.stereotype.Component; |
26 | -import org.thingsboard.server.common.data.EntityType; | |
27 | 23 | import org.thingsboard.server.common.data.User; |
28 | -import org.thingsboard.server.common.data.id.EdgeId; | |
29 | 24 | import org.thingsboard.server.common.data.id.TenantId; |
30 | 25 | import org.thingsboard.server.common.data.page.TextPageLink; |
31 | -import org.thingsboard.server.common.data.page.TimePageLink; | |
32 | -import org.thingsboard.server.common.data.relation.EntityRelation; | |
33 | -import org.thingsboard.server.common.data.relation.RelationTypeGroup; | |
34 | 26 | import org.thingsboard.server.common.data.security.Authority; |
35 | 27 | import org.thingsboard.server.dao.DaoUtil; |
36 | 28 | import org.thingsboard.server.dao.model.sql.UserEntity; |
37 | -import org.thingsboard.server.dao.relation.RelationDao; | |
38 | 29 | import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao; |
39 | 30 | import org.thingsboard.server.dao.user.UserDao; |
40 | 31 | import org.thingsboard.server.dao.util.SqlDao; |
41 | 32 | |
42 | -import java.util.ArrayList; | |
43 | 33 | import java.util.List; |
44 | 34 | import java.util.Objects; |
45 | 35 | import java.util.UUID; |
... | ... | @@ -58,9 +48,6 @@ public class JpaUserDao extends JpaAbstractSearchTextDao<UserEntity, User> imple |
58 | 48 | @Autowired |
59 | 49 | private UserRepository userRepository; |
60 | 50 | |
61 | - @Autowired | |
62 | - private RelationDao relationDao; | |
63 | - | |
64 | 51 | @Override |
65 | 52 | protected Class<UserEntity> getEntityClass() { |
66 | 53 | return UserEntity.class; |
... | ... | @@ -102,17 +89,4 @@ public class JpaUserDao extends JpaAbstractSearchTextDao<UserEntity, User> imple |
102 | 89 | PageRequest.of(0, pageLink.getLimit()))); |
103 | 90 | |
104 | 91 | } |
105 | - | |
106 | - @Override | |
107 | - public ListenableFuture<List<User>> findUsersByTenantIdAndEdgeId(UUID tenantId, UUID edgeId, TimePageLink pageLink) { | |
108 | - log.debug("Try to find users by tenantId [{}], edgeId [{}] and pageLink [{}]", tenantId, edgeId, pageLink); | |
109 | - ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(new TenantId(tenantId), new EdgeId(edgeId), EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE, EntityType.USER, pageLink); | |
110 | - return Futures.transformAsync(relations, input -> { | |
111 | - List<ListenableFuture<User>> userFutures = new ArrayList<>(input.size()); | |
112 | - for (EntityRelation relation : input) { | |
113 | - userFutures.add(findByIdAsync(new TenantId(tenantId), relation.getTo().getId())); | |
114 | - } | |
115 | - return Futures.successfulAsList(userFutures); | |
116 | - }, MoreExecutors.directExecutor()); | |
117 | - } | |
118 | 92 | } | ... | ... |
... | ... | @@ -16,30 +16,18 @@ |
16 | 16 | package org.thingsboard.server.dao.user; |
17 | 17 | |
18 | 18 | import com.datastax.driver.core.querybuilder.Select.Where; |
19 | -import com.google.common.util.concurrent.Futures; | |
20 | -import com.google.common.util.concurrent.ListenableFuture; | |
21 | -import com.google.common.util.concurrent.MoreExecutors; | |
22 | 19 | import lombok.extern.slf4j.Slf4j; |
23 | -import org.springframework.beans.factory.annotation.Autowired; | |
24 | 20 | import org.springframework.stereotype.Component; |
25 | -import org.thingsboard.server.common.data.EntityType; | |
26 | 21 | import org.thingsboard.server.common.data.User; |
27 | -import org.thingsboard.server.common.data.asset.Asset; | |
28 | -import org.thingsboard.server.common.data.id.EdgeId; | |
29 | 22 | import org.thingsboard.server.common.data.id.TenantId; |
30 | 23 | import org.thingsboard.server.common.data.page.TextPageLink; |
31 | -import org.thingsboard.server.common.data.page.TimePageLink; | |
32 | -import org.thingsboard.server.common.data.relation.EntityRelation; | |
33 | -import org.thingsboard.server.common.data.relation.RelationTypeGroup; | |
34 | 24 | import org.thingsboard.server.common.data.security.Authority; |
35 | 25 | import org.thingsboard.server.dao.DaoUtil; |
36 | 26 | import org.thingsboard.server.dao.model.ModelConstants; |
37 | 27 | import org.thingsboard.server.dao.model.nosql.UserEntity; |
38 | 28 | import org.thingsboard.server.dao.nosql.CassandraAbstractSearchTextDao; |
39 | -import org.thingsboard.server.dao.relation.RelationDao; | |
40 | 29 | import org.thingsboard.server.dao.util.NoSqlDao; |
41 | 30 | |
42 | -import java.util.ArrayList; | |
43 | 31 | import java.util.Arrays; |
44 | 32 | import java.util.List; |
45 | 33 | import java.util.UUID; |
... | ... | @@ -52,9 +40,6 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.select; |
52 | 40 | @NoSqlDao |
53 | 41 | public class CassandraUserDao extends CassandraAbstractSearchTextDao<UserEntity, User> implements UserDao { |
54 | 42 | |
55 | - @Autowired | |
56 | - private RelationDao relationDao; | |
57 | - | |
58 | 43 | @Override |
59 | 44 | protected Class<UserEntity> getColumnFamilyClass() { |
60 | 45 | return UserEntity.class; |
... | ... | @@ -100,17 +85,4 @@ public class CassandraUserDao extends CassandraAbstractSearchTextDao<UserEntity, |
100 | 85 | log.trace("Found customer users [{}] by tenantId [{}], customerId [{}] and pageLink [{}]", userEntities, tenantId, customerId, pageLink); |
101 | 86 | return DaoUtil.convertDataList(userEntities); |
102 | 87 | } |
103 | - | |
104 | - @Override | |
105 | - public ListenableFuture<List<User>> findUsersByTenantIdAndEdgeId(UUID tenantId, UUID edgeId, TimePageLink pageLink) { | |
106 | - log.debug("Try to find users by tenantId [{}], edgeId [{}] and pageLink [{}]", tenantId, edgeId, pageLink); | |
107 | - ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(new TenantId(tenantId), new EdgeId(edgeId), EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE, EntityType.USER, pageLink); | |
108 | - return Futures.transformAsync(relations, input -> { | |
109 | - List<ListenableFuture<User>> userFutures = new ArrayList<>(input.size()); | |
110 | - for (EntityRelation relation : input) { | |
111 | - userFutures.add(findByIdAsync(new TenantId(tenantId), relation.getTo().getId())); | |
112 | - } | |
113 | - return Futures.successfulAsList(userFutures); | |
114 | - }, MoreExecutors.directExecutor()); | |
115 | - } | |
116 | 88 | } | ... | ... |
... | ... | @@ -15,11 +15,9 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.dao.user; |
17 | 17 | |
18 | -import com.google.common.util.concurrent.ListenableFuture; | |
19 | 18 | import org.thingsboard.server.common.data.User; |
20 | 19 | import org.thingsboard.server.common.data.id.TenantId; |
21 | 20 | import org.thingsboard.server.common.data.page.TextPageLink; |
22 | -import org.thingsboard.server.common.data.page.TimePageLink; | |
23 | 21 | import org.thingsboard.server.dao.Dao; |
24 | 22 | |
25 | 23 | import java.util.List; |
... | ... | @@ -42,7 +40,7 @@ public interface UserDao extends Dao<User> { |
42 | 40 | * @return the user entity |
43 | 41 | */ |
44 | 42 | User findByEmail(TenantId tenantId, String email); |
45 | - | |
43 | + | |
46 | 44 | /** |
47 | 45 | * Find tenant admin users by tenantId and page link. |
48 | 46 | * |
... | ... | @@ -51,7 +49,7 @@ public interface UserDao extends Dao<User> { |
51 | 49 | * @return the list of user entities |
52 | 50 | */ |
53 | 51 | List<User> findTenantAdmins(UUID tenantId, TextPageLink pageLink); |
54 | - | |
52 | + | |
55 | 53 | /** |
56 | 54 | * Find customer users by tenantId, customerId and page link. |
57 | 55 | * |
... | ... | @@ -61,15 +59,4 @@ public interface UserDao extends Dao<User> { |
61 | 59 | * @return the list of user entities |
62 | 60 | */ |
63 | 61 | List<User> findCustomerUsers(UUID tenantId, UUID customerId, TextPageLink pageLink); |
64 | - | |
65 | - /** | |
66 | - * Find users by tenantId, edgeId and page link. | |
67 | - * | |
68 | - * @param tenantId the tenantId | |
69 | - * @param edgeId the edgeId | |
70 | - * @param pageLink the page link | |
71 | - * @return the list of user objects | |
72 | - */ | |
73 | - ListenableFuture<List<User>> findUsersByTenantIdAndEdgeId(UUID tenantId, UUID edgeId, TimePageLink pageLink); | |
74 | - | |
75 | 62 | } | ... | ... |
... | ... | @@ -18,10 +18,7 @@ package org.thingsboard.server.dao.user; |
18 | 18 | import com.fasterxml.jackson.databind.JsonNode; |
19 | 19 | import com.fasterxml.jackson.databind.ObjectMapper; |
20 | 20 | import com.fasterxml.jackson.databind.node.ObjectNode; |
21 | -import com.google.common.base.Function; | |
22 | -import com.google.common.util.concurrent.Futures; | |
23 | 21 | import com.google.common.util.concurrent.ListenableFuture; |
24 | -import com.google.common.util.concurrent.MoreExecutors; | |
25 | 22 | import lombok.extern.slf4j.Slf4j; |
26 | 23 | import org.apache.commons.lang3.RandomStringUtils; |
27 | 24 | import org.apache.commons.lang3.StringUtils; |
... | ... | @@ -31,18 +28,12 @@ import org.springframework.stereotype.Service; |
31 | 28 | import org.thingsboard.server.common.data.Customer; |
32 | 29 | import org.thingsboard.server.common.data.Tenant; |
33 | 30 | import org.thingsboard.server.common.data.User; |
34 | -import org.thingsboard.server.common.data.edge.Edge; | |
35 | 31 | import org.thingsboard.server.common.data.id.CustomerId; |
36 | -import org.thingsboard.server.common.data.id.EdgeId; | |
37 | 32 | import org.thingsboard.server.common.data.id.TenantId; |
38 | 33 | import org.thingsboard.server.common.data.id.UserCredentialsId; |
39 | 34 | import org.thingsboard.server.common.data.id.UserId; |
40 | 35 | import org.thingsboard.server.common.data.page.TextPageData; |
41 | 36 | import org.thingsboard.server.common.data.page.TextPageLink; |
42 | -import org.thingsboard.server.common.data.page.TimePageData; | |
43 | -import org.thingsboard.server.common.data.page.TimePageLink; | |
44 | -import org.thingsboard.server.common.data.relation.EntityRelation; | |
45 | -import org.thingsboard.server.common.data.relation.RelationTypeGroup; | |
46 | 37 | import org.thingsboard.server.common.data.security.Authority; |
47 | 38 | import org.thingsboard.server.common.data.security.UserCredentials; |
48 | 39 | import org.thingsboard.server.dao.customer.CustomerDao; |
... | ... | @@ -55,11 +46,9 @@ import org.thingsboard.server.dao.service.DataValidator; |
55 | 46 | import org.thingsboard.server.dao.service.PaginatedRemover; |
56 | 47 | import org.thingsboard.server.dao.tenant.TenantDao; |
57 | 48 | |
58 | -import javax.annotation.Nullable; | |
59 | 49 | import java.util.HashMap; |
60 | 50 | import java.util.List; |
61 | 51 | import java.util.Map; |
62 | -import java.util.concurrent.ExecutionException; | |
63 | 52 | |
64 | 53 | import static org.thingsboard.server.dao.service.Validator.validateId; |
65 | 54 | import static org.thingsboard.server.dao.service.Validator.validatePageLink; |
... | ... | @@ -327,57 +316,6 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic |
327 | 316 | return failedLoginAttempts; |
328 | 317 | } |
329 | 318 | |
330 | - @Override | |
331 | - public User assignUserToEdge(TenantId tenantId, UserId userId, EdgeId edgeId) { | |
332 | - User user = findUserById(tenantId, userId); | |
333 | - Edge edge = edgeService.findEdgeById(tenantId, edgeId); | |
334 | - if (edge == null) { | |
335 | - throw new DataValidationException("Can't assign user to non-existent edge!"); | |
336 | - } | |
337 | - if (!edge.getTenantId().getId().equals(user.getTenantId().getId())) { | |
338 | - throw new DataValidationException("Can't assign user to edge from different tenant!"); | |
339 | - } | |
340 | - try { | |
341 | - createRelation(tenantId, new EntityRelation(edgeId, userId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE)); | |
342 | - } catch (ExecutionException | InterruptedException e) { | |
343 | - log.warn("[{}] Failed to create user relation. Edge Id: [{}]", userId, edgeId); | |
344 | - throw new RuntimeException(e); | |
345 | - } | |
346 | - return user; | |
347 | - } | |
348 | - | |
349 | - @Override | |
350 | - public User unassignUserFromEdge(TenantId tenantId, UserId userId, EdgeId edgeId) { | |
351 | - User user = findUserById(tenantId, userId); | |
352 | - Edge edge = edgeService.findEdgeById(tenantId, edgeId); | |
353 | - if (edge == null) { | |
354 | - throw new DataValidationException("Can't unassign user from non-existent edge!"); | |
355 | - } | |
356 | - try { | |
357 | - deleteRelation(tenantId, new EntityRelation(edgeId, userId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE)); | |
358 | - } catch (ExecutionException | InterruptedException e) { | |
359 | - log.warn("[{}] Failed to delete user relation. Edge Id: [{}]", userId, edgeId); | |
360 | - throw new RuntimeException(e); | |
361 | - } | |
362 | - return user; | |
363 | - } | |
364 | - | |
365 | - @Override | |
366 | - public ListenableFuture<TimePageData<User>> findUsersByTenantIdAndEdgeId(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink) { | |
367 | - log.trace("Executing findUsersByTenantIdAndEdgeId, tenantId [{}], edgeId [{}], pageLink [{}]", tenantId, edgeId, pageLink); | |
368 | - validateId(tenantId, INCORRECT_TENANT_ID + tenantId); | |
369 | - validateId(edgeId, INCORRECT_EDGE_ID + edgeId); | |
370 | - validatePageLink(pageLink, INCORRECT_PAGE_LINK + pageLink); | |
371 | - ListenableFuture<List<User>> users = userDao.findUsersByTenantIdAndEdgeId(tenantId.getId(), edgeId.getId(), pageLink); | |
372 | - return Futures.transform(users, new Function<List<User>, TimePageData<User>>() { | |
373 | - @Nullable | |
374 | - @Override | |
375 | - public TimePageData<User> apply(@Nullable List<User> users) { | |
376 | - return new TimePageData<>(users, pageLink); | |
377 | - } | |
378 | - }, MoreExecutors.directExecutor()); | |
379 | - } | |
380 | - | |
381 | 319 | private int increaseFailedLoginAttempts(User user) { |
382 | 320 | JsonNode additionalInfo = user.getAdditionalInfo(); |
383 | 321 | if (!(additionalInfo instanceof ObjectNode)) { | ... | ... |