Commit 3fc18988be0cb633b0da8ef08272333a91c8a237

Authored by Volodymyr Babak
1 parent 3a416131

Added update relation msg functionality

... ... @@ -35,13 +35,17 @@ import org.thingsboard.rule.engine.api.MailService;
35 35 import org.thingsboard.server.common.data.EntityType;
36 36 import org.thingsboard.server.common.data.User;
37 37 import org.thingsboard.server.common.data.audit.ActionType;
  38 +import org.thingsboard.server.common.data.edge.Edge;
38 39 import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
39 40 import org.thingsboard.server.common.data.exception.ThingsboardException;
40 41 import org.thingsboard.server.common.data.id.CustomerId;
  42 +import org.thingsboard.server.common.data.id.EdgeId;
41 43 import org.thingsboard.server.common.data.id.TenantId;
42 44 import org.thingsboard.server.common.data.id.UserId;
43 45 import org.thingsboard.server.common.data.page.TextPageData;
44 46 import org.thingsboard.server.common.data.page.TextPageLink;
  47 +import org.thingsboard.server.common.data.page.TimePageData;
  48 +import org.thingsboard.server.common.data.page.TimePageLink;
45 49 import org.thingsboard.server.common.data.security.Authority;
46 50 import org.thingsboard.server.common.data.security.UserCredentials;
47 51 import org.thingsboard.server.queue.util.TbCoreComponent;
... ... @@ -56,6 +60,8 @@ import org.thingsboard.server.utils.MiscUtils;
56 60
57 61 import javax.servlet.http.HttpServletRequest;
58 62
  63 +import static org.thingsboard.server.controller.EdgeController.EDGE_ID;
  64 +
59 65 @RestController
60 66 @TbCoreComponent
61 67 @RequestMapping("/api")
... ... @@ -300,4 +306,88 @@ public class UserController extends BaseController {
300 306 throw handleException(e);
301 307 }
302 308 }
  309 +
  310 + @PreAuthorize("hasAuthority('TENANT_ADMIN')")
  311 + @RequestMapping(value = "/edge/{edgeId}/user/{userId}", method = RequestMethod.POST)
  312 + @ResponseBody
  313 + public User assignUserToEdge(@PathVariable(EDGE_ID) String strEdgeId,
  314 + @PathVariable(USER_ID) String strUserId) throws ThingsboardException {
  315 + checkParameter(EDGE_ID, strEdgeId);
  316 + checkParameter(USER_ID, strUserId);
  317 + try {
  318 + EdgeId edgeId = new EdgeId(toUUID(strEdgeId));
  319 + Edge edge = checkEdgeId(edgeId, Operation.READ);
  320 +
  321 + UserId userId = new UserId(toUUID(strUserId));
  322 + checkUserId(userId, Operation.ASSIGN_TO_EDGE);
  323 +
  324 + User savedUser = checkNotNull(userService.assignUserToEdge(getTenantId(), userId, edgeId));
  325 +
  326 + logEntityAction(userId, savedUser,
  327 + savedUser.getCustomerId(),
  328 + ActionType.ASSIGNED_TO_EDGE, null, strUserId, strEdgeId, edge.getName());
  329 +
  330 + return savedUser;
  331 + } catch (Exception e) {
  332 +
  333 + logEntityAction(emptyId(EntityType.USER), null,
  334 + null,
  335 + ActionType.ASSIGNED_TO_EDGE, e, strUserId, strEdgeId);
  336 +
  337 + throw handleException(e);
  338 + }
  339 + }
  340 +
  341 + @PreAuthorize("hasAuthority('TENANT_ADMIN')")
  342 + @RequestMapping(value = "/edge/{edgeId}/user/{userId}", method = RequestMethod.DELETE)
  343 + @ResponseBody
  344 + public User unassignUserFromEdge(@PathVariable(EDGE_ID) String strEdgeId,
  345 + @PathVariable(USER_ID) String strUserId) throws ThingsboardException {
  346 + checkParameter(EDGE_ID, strEdgeId);
  347 + checkParameter(USER_ID, strUserId);
  348 + try {
  349 + EdgeId edgeId = new EdgeId(toUUID(strEdgeId));
  350 + Edge edge = checkEdgeId(edgeId, Operation.READ);
  351 +
  352 + UserId userId = new UserId(toUUID(strUserId));
  353 + User user = checkUserId(userId, Operation.UNASSIGN_FROM_EDGE);
  354 +
  355 + User savedUser = checkNotNull(userService.unassignUserFromEdge(getTenantId(), userId, edgeId));
  356 +
  357 + logEntityAction(userId, savedUser,
  358 + savedUser.getCustomerId(),
  359 + ActionType.UNASSIGNED_FROM_EDGE, null, strUserId, edge.getId().toString(), edge.getName());
  360 +
  361 + return savedUser;
  362 + } catch (Exception e) {
  363 +
  364 + logEntityAction(emptyId(EntityType.USER), null,
  365 + null,
  366 + ActionType.UNASSIGNED_FROM_EDGE, e, strUserId);
  367 +
  368 + throw handleException(e);
  369 + }
  370 + }
  371 +
  372 + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
  373 + @RequestMapping(value = "/edge/{edgeId}/users", params = {"limit"}, method = RequestMethod.GET)
  374 + @ResponseBody
  375 + public TimePageData<User> getEdgeUsers(
  376 + @PathVariable(EDGE_ID) String strEdgeId,
  377 + @RequestParam int limit,
  378 + @RequestParam(required = false) Long startTime,
  379 + @RequestParam(required = false) Long endTime,
  380 + @RequestParam(required = false, defaultValue = "false") boolean ascOrder,
  381 + @RequestParam(required = false) String offset) throws ThingsboardException {
  382 + checkParameter(EDGE_ID, strEdgeId);
  383 + try {
  384 + TenantId tenantId = getCurrentUser().getTenantId();
  385 + EdgeId edgeId = new EdgeId(toUUID(strEdgeId));
  386 + checkEdgeId(edgeId, Operation.READ);
  387 + TimePageLink pageLink = createPageLink(limit, startTime, endTime, ascOrder, offset);
  388 + return checkNotNull(userService.findUsersByTenantIdAndEdgeId(tenantId, edgeId, pageLink).get());
  389 + } catch (Exception e) {
  390 + throw handleException(e);
  391 + }
  392 + }
303 393 }
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.service.edge;
17 17
18 18 import lombok.Data;
  19 +import lombok.Getter;
19 20 import org.springframework.beans.factory.annotation.Autowired;
20 21 import org.springframework.context.annotation.Lazy;
21 22 import org.springframework.stereotype.Component;
... ... @@ -37,9 +38,10 @@ import org.thingsboard.server.service.edge.rpc.constructor.DashboardUpdateMsgCon
37 38 import org.thingsboard.server.service.edge.rpc.constructor.DeviceUpdateMsgConstructor;
38 39 import org.thingsboard.server.service.edge.rpc.constructor.EntityViewUpdateMsgConstructor;
39 40 import org.thingsboard.server.service.edge.rpc.constructor.RelationUpdateMsgConstructor;
  41 +import org.thingsboard.server.service.edge.rpc.constructor.RuleChainUpdateMsgConstructor;
40 42 import org.thingsboard.server.service.edge.rpc.constructor.UserUpdateMsgConstructor;
41 43 import org.thingsboard.server.service.edge.rpc.init.SyncEdgeService;
42   -import org.thingsboard.server.service.edge.rpc.constructor.RuleChainUpdateMsgConstructor;
  44 +import org.thingsboard.server.service.executors.DbCallbackExecutorService;
43 45 import org.thingsboard.server.service.queue.TbClusterService;
44 46 import org.thingsboard.server.service.state.DeviceStateService;
45 47
... ... @@ -138,4 +140,8 @@ public class EdgeContextComponent {
138 140 @Lazy
139 141 @Autowired
140 142 private EdgeEventStorageSettings edgeEventStorageSettings;
  143 +
  144 + @Autowired
  145 + @Getter
  146 + private DbCallbackExecutorService dbCallbackExecutor;
141 147 }
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -141,7 +141,7 @@ public final class EdgeGrpcSession implements Closeable {
141 141 outputStream.onError(new RuntimeException(responseMsg.getErrorMsg()));
142 142 }
143 143 if (ConnectResponseCode.ACCEPTED == responseMsg.getResponseCode()) {
144   - ctx.getSyncEdgeService().sync(edge, outputStream);
  144 + ctx.getSyncEdgeService().sync(ctx, edge, outputStream);
145 145 }
146 146 }
147 147 if (connected) {
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -17,7 +17,6 @@ package org.thingsboard.server.service.edge.rpc.init;
17 17
18 18 import com.google.common.util.concurrent.Futures;
19 19 import com.google.common.util.concurrent.ListenableFuture;
20   -import com.google.common.util.concurrent.MoreExecutors;
21 20 import io.grpc.stub.StreamObserver;
22 21 import lombok.extern.slf4j.Slf4j;
23 22 import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -26,7 +25,7 @@ import org.thingsboard.server.common.data.Dashboard;
26 25 import org.thingsboard.server.common.data.DashboardInfo;
27 26 import org.thingsboard.server.common.data.Device;
28 27 import org.thingsboard.server.common.data.EntityView;
29   -import org.thingsboard.server.common.data.Tenant;
  28 +import org.thingsboard.server.common.data.User;
30 29 import org.thingsboard.server.common.data.asset.Asset;
31 30 import org.thingsboard.server.common.data.edge.Edge;
32 31 import org.thingsboard.server.common.data.id.EntityId;
... ... @@ -45,6 +44,7 @@ import org.thingsboard.server.dao.device.DeviceService;
45 44 import org.thingsboard.server.dao.entityview.EntityViewService;
46 45 import org.thingsboard.server.dao.relation.RelationService;
47 46 import org.thingsboard.server.dao.rule.RuleChainService;
  47 +import org.thingsboard.server.dao.user.UserService;
48 48 import org.thingsboard.server.gen.edge.AssetUpdateMsg;
49 49 import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
50 50 import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
... ... @@ -56,12 +56,15 @@ import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg;
56 56 import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg;
57 57 import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
58 58 import org.thingsboard.server.gen.edge.UpdateMsgType;
  59 +import org.thingsboard.server.gen.edge.UserUpdateMsg;
  60 +import org.thingsboard.server.service.edge.EdgeContextComponent;
59 61 import org.thingsboard.server.service.edge.rpc.constructor.AssetUpdateMsgConstructor;
60 62 import org.thingsboard.server.service.edge.rpc.constructor.DashboardUpdateMsgConstructor;
61 63 import org.thingsboard.server.service.edge.rpc.constructor.DeviceUpdateMsgConstructor;
62 64 import org.thingsboard.server.service.edge.rpc.constructor.EntityViewUpdateMsgConstructor;
63 65 import org.thingsboard.server.service.edge.rpc.constructor.RelationUpdateMsgConstructor;
64 66 import org.thingsboard.server.service.edge.rpc.constructor.RuleChainUpdateMsgConstructor;
  67 +import org.thingsboard.server.service.edge.rpc.constructor.UserUpdateMsgConstructor;
65 68
66 69 import java.util.ArrayList;
67 70 import java.util.HashSet;
... ... @@ -92,10 +95,10 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
92 95 private DashboardService dashboardService;
93 96
94 97 @Autowired
95   - private RuleChainUpdateMsgConstructor ruleChainUpdateMsgConstructor;
  98 + private UserService userService;
96 99
97 100 @Autowired
98   - private RelationUpdateMsgConstructor relationUpdateMsgConstructor;
  101 + private RuleChainUpdateMsgConstructor ruleChainUpdateMsgConstructor;
99 102
100 103 @Autowired
101 104 private DeviceUpdateMsgConstructor deviceUpdateMsgConstructor;
... ... @@ -109,68 +112,56 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
109 112 @Autowired
110 113 private DashboardUpdateMsgConstructor dashboardUpdateMsgConstructor;
111 114
  115 + @Autowired
  116 + private UserUpdateMsgConstructor userUpdateMsgConstructor;
  117 +
  118 + @Autowired
  119 + private RelationUpdateMsgConstructor relationUpdateMsgConstructor;
  120 +
112 121 @Override
113   - public void sync(Edge edge, StreamObserver<ResponseMsg> outputStream) {
  122 + public void sync(EdgeContextComponent ctx, Edge edge, StreamObserver<ResponseMsg> outputStream) {
114 123 Set<EntityId> pushedEntityIds = new HashSet<>();
115 124 syncRuleChains(edge, pushedEntityIds, outputStream);
116 125 syncDevices(edge, pushedEntityIds, outputStream);
117 126 syncAssets(edge, pushedEntityIds, outputStream);
118 127 syncEntityViews(edge, pushedEntityIds, outputStream);
119 128 syncDashboards(edge, pushedEntityIds, outputStream);
120   - syncRelations(edge, pushedEntityIds, outputStream);
  129 + syncUsers(ctx, edge, pushedEntityIds, outputStream);
  130 + syncRelations(ctx, edge, pushedEntityIds, outputStream);
121 131 }
122 132
123   - private void syncRelations(Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) {
124   - if (!pushedEntityIds.isEmpty()) {
125   - List<ListenableFuture<List<EntityRelation>>> futures = new ArrayList<>();
126   - for (EntityId entityId : pushedEntityIds) {
127   - futures.add(syncRelations(edge, entityId, EntitySearchDirection.FROM));
128   - futures.add(syncRelations(edge, entityId, EntitySearchDirection.TO));
129   - }
130   - ListenableFuture<List<List<EntityRelation>>> relationsListFuture = Futures.allAsList(futures);
131   - Futures.transform(relationsListFuture, relationsList -> {
132   - try {
133   - Set<EntityRelation> uniqueEntityRelations = new HashSet<>();
134   - if (!relationsList.isEmpty()) {
135   - for (List<EntityRelation> entityRelations : relationsList) {
136   - if (!entityRelations.isEmpty()) {
137   - uniqueEntityRelations.addAll(entityRelations);
138   - }
139   - }
140   - }
141   - if (!uniqueEntityRelations.isEmpty()) {
142   - log.trace("[{}] [{}] relation(s) are going to be pushed to edge.", edge.getId(), uniqueEntityRelations.size());
143   - for (EntityRelation relation : uniqueEntityRelations) {
144   - try {
145   - RelationUpdateMsg relationUpdateMsg =
146   - relationUpdateMsgConstructor.constructRelationUpdatedMsg(
147   - UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE,
148   - relation);
149   - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
150   - .setRelationUpdateMsg(relationUpdateMsg)
151   - .build();
152   - outputStream.onNext(ResponseMsg.newBuilder()
153   - .setEntityUpdateMsg(entityUpdateMsg)
154   - .build());
155   - } catch (Exception e) {
156   - log.error("Exception during loading relation [{}] to edge on init!", relation, e);
157   - }
158   - }
  133 + private void syncRuleChains(Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) {
  134 + try {
  135 + TimePageLink pageLink = new TimePageLink(100);
  136 + TimePageData<RuleChain> pageData;
  137 + do {
  138 + pageData = ruleChainService.findRuleChainsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink).get();
  139 + if (!pageData.getData().isEmpty()) {
  140 + log.trace("[{}] [{}] rule chains(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
  141 + for (RuleChain ruleChain : pageData.getData()) {
  142 + RuleChainUpdateMsg ruleChainUpdateMsg =
  143 + ruleChainUpdateMsgConstructor.constructRuleChainUpdatedMsg(
  144 + edge.getRootRuleChainId(),
  145 + UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE,
  146 + ruleChain);
  147 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  148 + .setRuleChainUpdateMsg(ruleChainUpdateMsg)
  149 + .build();
  150 + outputStream.onNext(ResponseMsg.newBuilder()
  151 + .setEntityUpdateMsg(entityUpdateMsg)
  152 + .build());
  153 + pushedEntityIds.add(ruleChain.getId());
159 154 }
160   - } catch (Exception e) {
161   - log.error("Exception during loading relation(s) to edge on init!", e);
162 155 }
163   - return null;
164   - }, MoreExecutors.directExecutor());
  156 + if (pageData.hasNext()) {
  157 + pageLink = pageData.getNextPageLink();
  158 + }
  159 + } while (pageData.hasNext());
  160 + } catch (Exception e) {
  161 + log.error("Exception during loading edge rule chain(s) on sync!", e);
165 162 }
166 163 }
167 164
168   - private ListenableFuture<List<EntityRelation>> syncRelations(Edge edge, EntityId entityId, EntitySearchDirection direction) {
169   - EntityRelationsQuery query = new EntityRelationsQuery();
170   - query.setParameters(new RelationsSearchParameters(entityId, direction, -1, false));
171   - return relationService.findByQuery(edge.getTenantId(), query);
172   - }
173   -
174 165 private void syncDevices(Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) {
175 166 try {
176 167 TimePageLink pageLink = new TimePageLink(100);
... ... @@ -198,7 +189,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
198 189 }
199 190 } while (pageData.hasNext());
200 191 } catch (Exception e) {
201   - log.error("Exception during loading edge device(s) on init!", e);
  192 + log.error("Exception during loading edge device(s) on sync!", e);
202 193 }
203 194 }
204 195
... ... @@ -229,7 +220,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
229 220 }
230 221 } while (pageData.hasNext());
231 222 } catch (Exception e) {
232   - log.error("Exception during loading edge asset(s) on init!", e);
  223 + log.error("Exception during loading edge asset(s) on sync!", e);
233 224 }
234 225 }
235 226
... ... @@ -260,7 +251,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
260 251 }
261 252 } while (pageData.hasNext());
262 253 } catch (Exception e) {
263   - log.error("Exception during loading edge entity view(s) on init!", e);
  254 + log.error("Exception during loading edge entity view(s) on sync!", e);
264 255 }
265 256 }
266 257
... ... @@ -292,31 +283,30 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
292 283 }
293 284 } while (pageData.hasNext());
294 285 } catch (Exception e) {
295   - log.error("Exception during loading edge dashboard(s) on init!", e);
  286 + log.error("Exception during loading edge dashboard(s) on sync!", e);
296 287 }
297 288 }
298 289
299   - private void syncRuleChains(Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) {
  290 + private void syncUsers(EdgeContextComponent ctx, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) {
300 291 try {
301 292 TimePageLink pageLink = new TimePageLink(100);
302   - TimePageData<RuleChain> pageData;
  293 + TimePageData<User> pageData;
303 294 do {
304   - pageData = ruleChainService.findRuleChainsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink).get();
  295 + pageData = userService.findUsersByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink).get();
305 296 if (!pageData.getData().isEmpty()) {
306   - log.trace("[{}] [{}] rule chains(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
307   - for (RuleChain ruleChain : pageData.getData()) {
308   - RuleChainUpdateMsg ruleChainUpdateMsg =
309   - ruleChainUpdateMsgConstructor.constructRuleChainUpdatedMsg(
310   - edge.getRootRuleChainId(),
311   - UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE,
312   - ruleChain);
  297 + log.trace("[{}] [{}] user(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
  298 + for (User user : pageData.getData()) {
  299 + UserUpdateMsg userUpdateMsg =
  300 + userUpdateMsgConstructor.constructUserUpdatedMsg(
  301 + UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE,
  302 + user);
313 303 EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
314   - .setRuleChainUpdateMsg(ruleChainUpdateMsg)
  304 + .setUserUpdateMsg(userUpdateMsg)
315 305 .build();
316 306 outputStream.onNext(ResponseMsg.newBuilder()
317 307 .setEntityUpdateMsg(entityUpdateMsg)
318 308 .build());
319   - pushedEntityIds.add(ruleChain.getId());
  309 + pushedEntityIds.add(user.getId());
320 310 }
321 311 }
322 312 if (pageData.hasNext()) {
... ... @@ -324,10 +314,62 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
324 314 }
325 315 } while (pageData.hasNext());
326 316 } catch (Exception e) {
327   - log.error("Exception during loading edge rule chain(s) on init!", e);
  317 + log.error("Exception during loading edge user(s) on sync!", e);
  318 + }
  319 + }
  320 +
  321 + private void syncRelations(EdgeContextComponent ctx, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) {
  322 + if (!pushedEntityIds.isEmpty()) {
  323 + List<ListenableFuture<List<EntityRelation>>> futures = new ArrayList<>();
  324 + for (EntityId entityId : pushedEntityIds) {
  325 + futures.add(syncRelations(edge, entityId, EntitySearchDirection.FROM));
  326 + futures.add(syncRelations(edge, entityId, EntitySearchDirection.TO));
  327 + }
  328 + ListenableFuture<List<List<EntityRelation>>> relationsListFuture = Futures.allAsList(futures);
  329 + Futures.transform(relationsListFuture, relationsList -> {
  330 + try {
  331 + Set<EntityRelation> uniqueEntityRelations = new HashSet<>();
  332 + if (!relationsList.isEmpty()) {
  333 + for (List<EntityRelation> entityRelations : relationsList) {
  334 + if (!entityRelations.isEmpty()) {
  335 + uniqueEntityRelations.addAll(entityRelations);
  336 + }
  337 + }
  338 + }
  339 + if (!uniqueEntityRelations.isEmpty()) {
  340 + log.trace("[{}] [{}] relation(s) are going to be pushed to edge.", edge.getId(), uniqueEntityRelations.size());
  341 + for (EntityRelation relation : uniqueEntityRelations) {
  342 + try {
  343 + RelationUpdateMsg relationUpdateMsg =
  344 + relationUpdateMsgConstructor.constructRelationUpdatedMsg(
  345 + UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE,
  346 + relation);
  347 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  348 + .setRelationUpdateMsg(relationUpdateMsg)
  349 + .build();
  350 + outputStream.onNext(ResponseMsg.newBuilder()
  351 + .setEntityUpdateMsg(entityUpdateMsg)
  352 + .build());
  353 + } catch (Exception e) {
  354 + log.error("Exception during loading relation [{}] to edge on sync!", relation, e);
  355 + }
  356 + }
  357 + }
  358 + } catch (Exception e) {
  359 + log.error("Exception during loading relation(s) to edge on sync!", e);
  360 + }
  361 + return null;
  362 + }, ctx.getDbCallbackExecutor());
328 363 }
329 364 }
330 365
  366 + private ListenableFuture<List<EntityRelation>> syncRelations(Edge edge, EntityId entityId, EntitySearchDirection direction) {
  367 + EntityRelationsQuery query = new EntityRelationsQuery();
  368 + query.setParameters(new RelationsSearchParameters(entityId, direction, -1, false));
  369 + return relationService.findByQuery(edge.getTenantId(), query);
  370 + }
  371 +
  372 +
331 373 @Override
332 374 public void syncRuleChainMetadata(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg, StreamObserver<ResponseMsg> outputStream) {
333 375 if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) {
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -19,10 +19,11 @@ import io.grpc.stub.StreamObserver;
19 19 import org.thingsboard.server.common.data.edge.Edge;
20 20 import org.thingsboard.server.gen.edge.ResponseMsg;
21 21 import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg;
  22 +import org.thingsboard.server.service.edge.EdgeContextComponent;
22 23
23 24 public interface SyncEdgeService {
24 25
25   - void sync(Edge edge, StreamObserver<ResponseMsg> outputStream);
  26 + void sync(EdgeContextComponent ctx, Edge edge, StreamObserver<ResponseMsg> outputStream);
26 27
27 28 void syncRuleChainMetadata(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg, StreamObserver<ResponseMsg> outputStream);
28 29 }
... ...
... ... @@ -17,12 +17,17 @@ package org.thingsboard.server.dao.user;
17 17
18 18 import com.google.common.util.concurrent.ListenableFuture;
19 19 import org.thingsboard.server.common.data.User;
  20 +import org.thingsboard.server.common.data.asset.Asset;
  21 +import org.thingsboard.server.common.data.id.AssetId;
20 22 import org.thingsboard.server.common.data.id.CustomerId;
  23 +import org.thingsboard.server.common.data.id.EdgeId;
21 24 import org.thingsboard.server.common.data.id.TenantId;
22 25 import org.thingsboard.server.common.data.id.UserCredentialsId;
23 26 import org.thingsboard.server.common.data.id.UserId;
24 27 import org.thingsboard.server.common.data.page.TextPageData;
25 28 import org.thingsboard.server.common.data.page.TextPageLink;
  29 +import org.thingsboard.server.common.data.page.TimePageData;
  30 +import org.thingsboard.server.common.data.page.TimePageLink;
26 31 import org.thingsboard.server.common.data.security.UserCredentials;
27 32
28 33 public interface UserService {
... ... @@ -66,4 +71,10 @@ public interface UserService {
66 71 void onUserLoginSuccessful(TenantId tenantId, UserId userId);
67 72
68 73 int onUserLoginIncorrectCredentials(TenantId tenantId, UserId userId);
  74 +
  75 + User assignUserToEdge(TenantId tenantId, UserId userId, EdgeId edgeId);
  76 +
  77 + User unassignUserFromEdge(TenantId tenantId, UserId userId, EdgeId edgeId);
  78 +
  79 + ListenableFuture<TimePageData<User>> findUsersByTenantIdAndEdgeId(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink);
69 80 }
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ...
... ... @@ -80,10 +80,8 @@ import static org.thingsboard.server.dao.service.Validator.validateString;
80 80 public class BaseAssetService extends AbstractEntityService implements AssetService {
81 81
82 82 public static final String INCORRECT_TENANT_ID = "Incorrect tenantId ";
83   - public static final String INCORRECT_PAGE_LINK = "Incorrect page link ";
84 83 public static final String INCORRECT_CUSTOMER_ID = "Incorrect customerId ";
85 84 public static final String INCORRECT_ASSET_ID = "Incorrect assetId ";
86   - public static final String INCORRECT_EDGE_ID = "Incorrect edgeId ";
87 85
88 86 @Autowired
89 87 private AssetDao assetDao;
... ...
... ... @@ -31,6 +31,9 @@ import java.util.concurrent.ExecutionException;
31 31 @Slf4j
32 32 public abstract class AbstractEntityService {
33 33
  34 + public static final String INCORRECT_EDGE_ID = "Incorrect edgeId ";
  35 + public static final String INCORRECT_PAGE_LINK = "Incorrect page link ";
  36 +
34 37 @Autowired
35 38 protected RelationService relationService;
36 39
... ...
... ... @@ -15,20 +15,31 @@
15 15 */
16 16 package org.thingsboard.server.dao.sql.user;
17 17
  18 +import com.google.common.util.concurrent.Futures;
  19 +import com.google.common.util.concurrent.ListenableFuture;
  20 +import com.google.common.util.concurrent.MoreExecutors;
  21 +import lombok.extern.slf4j.Slf4j;
18 22 import org.springframework.beans.factory.annotation.Autowired;
19 23 import org.springframework.data.domain.PageRequest;
20 24 import org.springframework.data.repository.CrudRepository;
21 25 import org.springframework.stereotype.Component;
  26 +import org.thingsboard.server.common.data.EntityType;
22 27 import org.thingsboard.server.common.data.User;
  28 +import org.thingsboard.server.common.data.id.EdgeId;
23 29 import org.thingsboard.server.common.data.id.TenantId;
24 30 import org.thingsboard.server.common.data.page.TextPageLink;
  31 +import org.thingsboard.server.common.data.page.TimePageLink;
  32 +import org.thingsboard.server.common.data.relation.EntityRelation;
  33 +import org.thingsboard.server.common.data.relation.RelationTypeGroup;
25 34 import org.thingsboard.server.common.data.security.Authority;
26 35 import org.thingsboard.server.dao.DaoUtil;
27 36 import org.thingsboard.server.dao.model.sql.UserEntity;
  37 +import org.thingsboard.server.dao.relation.RelationDao;
28 38 import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao;
29 39 import org.thingsboard.server.dao.user.UserDao;
30 40 import org.thingsboard.server.dao.util.SqlDao;
31 41
  42 +import java.util.ArrayList;
32 43 import java.util.List;
33 44 import java.util.Objects;
34 45 import java.util.UUID;
... ... @@ -41,11 +52,15 @@ import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID_STR;
41 52 */
42 53 @Component
43 54 @SqlDao
  55 +@Slf4j
44 56 public class JpaUserDao extends JpaAbstractSearchTextDao<UserEntity, User> implements UserDao {
45 57
46 58 @Autowired
47 59 private UserRepository userRepository;
48 60
  61 + @Autowired
  62 + private RelationDao relationDao;
  63 +
49 64 @Override
50 65 protected Class<UserEntity> getEntityClass() {
51 66 return UserEntity.class;
... ... @@ -87,4 +102,17 @@ public class JpaUserDao extends JpaAbstractSearchTextDao<UserEntity, User> imple
87 102 PageRequest.of(0, pageLink.getLimit())));
88 103
89 104 }
  105 +
  106 + @Override
  107 + public ListenableFuture<List<User>> findUsersByTenantIdAndEdgeId(UUID tenantId, UUID edgeId, TimePageLink pageLink) {
  108 + log.debug("Try to find users by tenantId [{}], edgeId [{}] and pageLink [{}]", tenantId, edgeId, pageLink);
  109 + ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(new TenantId(tenantId), new EdgeId(edgeId), EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE, EntityType.USER, pageLink);
  110 + return Futures.transformAsync(relations, input -> {
  111 + List<ListenableFuture<User>> userFutures = new ArrayList<>(input.size());
  112 + for (EntityRelation relation : input) {
  113 + userFutures.add(findByIdAsync(new TenantId(tenantId), relation.getTo().getId()));
  114 + }
  115 + return Futures.successfulAsList(userFutures);
  116 + }, MoreExecutors.directExecutor());
  117 + }
90 118 }
... ...
... ... @@ -16,18 +16,30 @@
16 16 package org.thingsboard.server.dao.user;
17 17
18 18 import com.datastax.driver.core.querybuilder.Select.Where;
  19 +import com.google.common.util.concurrent.Futures;
  20 +import com.google.common.util.concurrent.ListenableFuture;
  21 +import com.google.common.util.concurrent.MoreExecutors;
19 22 import lombok.extern.slf4j.Slf4j;
  23 +import org.springframework.beans.factory.annotation.Autowired;
20 24 import org.springframework.stereotype.Component;
  25 +import org.thingsboard.server.common.data.EntityType;
21 26 import org.thingsboard.server.common.data.User;
  27 +import org.thingsboard.server.common.data.asset.Asset;
  28 +import org.thingsboard.server.common.data.id.EdgeId;
22 29 import org.thingsboard.server.common.data.id.TenantId;
23 30 import org.thingsboard.server.common.data.page.TextPageLink;
  31 +import org.thingsboard.server.common.data.page.TimePageLink;
  32 +import org.thingsboard.server.common.data.relation.EntityRelation;
  33 +import org.thingsboard.server.common.data.relation.RelationTypeGroup;
24 34 import org.thingsboard.server.common.data.security.Authority;
25 35 import org.thingsboard.server.dao.DaoUtil;
26 36 import org.thingsboard.server.dao.model.ModelConstants;
27 37 import org.thingsboard.server.dao.model.nosql.UserEntity;
28 38 import org.thingsboard.server.dao.nosql.CassandraAbstractSearchTextDao;
  39 +import org.thingsboard.server.dao.relation.RelationDao;
29 40 import org.thingsboard.server.dao.util.NoSqlDao;
30 41
  42 +import java.util.ArrayList;
31 43 import java.util.Arrays;
32 44 import java.util.List;
33 45 import java.util.UUID;
... ... @@ -40,6 +52,9 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
40 52 @NoSqlDao
41 53 public class CassandraUserDao extends CassandraAbstractSearchTextDao<UserEntity, User> implements UserDao {
42 54
  55 + @Autowired
  56 + private RelationDao relationDao;
  57 +
43 58 @Override
44 59 protected Class<UserEntity> getColumnFamilyClass() {
45 60 return UserEntity.class;
... ... @@ -86,4 +101,16 @@ public class CassandraUserDao extends CassandraAbstractSearchTextDao<UserEntity,
86 101 return DaoUtil.convertDataList(userEntities);
87 102 }
88 103
  104 + @Override
  105 + public ListenableFuture<List<User>> findUsersByTenantIdAndEdgeId(UUID tenantId, UUID edgeId, TimePageLink pageLink) {
  106 + log.debug("Try to find users by tenantId [{}], edgeId [{}] and pageLink [{}]", tenantId, edgeId, pageLink);
  107 + ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(new TenantId(tenantId), new EdgeId(edgeId), EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE, EntityType.USER, pageLink);
  108 + return Futures.transformAsync(relations, input -> {
  109 + List<ListenableFuture<User>> userFutures = new ArrayList<>(input.size());
  110 + for (EntityRelation relation : input) {
  111 + userFutures.add(findByIdAsync(new TenantId(tenantId), relation.getTo().getId()));
  112 + }
  113 + return Futures.successfulAsList(userFutures);
  114 + }, MoreExecutors.directExecutor());
  115 + }
89 116 }
... ...
... ... @@ -15,9 +15,11 @@
15 15 */
16 16 package org.thingsboard.server.dao.user;
17 17
  18 +import com.google.common.util.concurrent.ListenableFuture;
18 19 import org.thingsboard.server.common.data.User;
19 20 import org.thingsboard.server.common.data.id.TenantId;
20 21 import org.thingsboard.server.common.data.page.TextPageLink;
  22 +import org.thingsboard.server.common.data.page.TimePageLink;
21 23 import org.thingsboard.server.dao.Dao;
22 24
23 25 import java.util.List;
... ... @@ -59,5 +61,15 @@ public interface UserDao extends Dao<User> {
59 61 * @return the list of user entities
60 62 */
61 63 List<User> findCustomerUsers(UUID tenantId, UUID customerId, TextPageLink pageLink);
  64 +
  65 + /**
  66 + * Find users by tenantId, edgeId and page link.
  67 + *
  68 + * @param tenantId the tenantId
  69 + * @param edgeId the edgeId
  70 + * @param pageLink the page link
  71 + * @return the list of user objects
  72 + */
  73 + ListenableFuture<List<User>> findUsersByTenantIdAndEdgeId(UUID tenantId, UUID edgeId, TimePageLink pageLink);
62 74
63 75 }
... ...
... ... @@ -18,7 +18,10 @@ package org.thingsboard.server.dao.user;
18 18 import com.fasterxml.jackson.databind.JsonNode;
19 19 import com.fasterxml.jackson.databind.ObjectMapper;
20 20 import com.fasterxml.jackson.databind.node.ObjectNode;
  21 +import com.google.common.base.Function;
  22 +import com.google.common.util.concurrent.Futures;
21 23 import com.google.common.util.concurrent.ListenableFuture;
  24 +import com.google.common.util.concurrent.MoreExecutors;
22 25 import lombok.extern.slf4j.Slf4j;
23 26 import org.apache.commons.lang3.RandomStringUtils;
24 27 import org.apache.commons.lang3.StringUtils;
... ... @@ -28,15 +31,22 @@ import org.springframework.stereotype.Service;
28 31 import org.thingsboard.server.common.data.Customer;
29 32 import org.thingsboard.server.common.data.Tenant;
30 33 import org.thingsboard.server.common.data.User;
  34 +import org.thingsboard.server.common.data.edge.Edge;
31 35 import org.thingsboard.server.common.data.id.CustomerId;
  36 +import org.thingsboard.server.common.data.id.EdgeId;
32 37 import org.thingsboard.server.common.data.id.TenantId;
33 38 import org.thingsboard.server.common.data.id.UserCredentialsId;
34 39 import org.thingsboard.server.common.data.id.UserId;
35 40 import org.thingsboard.server.common.data.page.TextPageData;
36 41 import org.thingsboard.server.common.data.page.TextPageLink;
  42 +import org.thingsboard.server.common.data.page.TimePageData;
  43 +import org.thingsboard.server.common.data.page.TimePageLink;
  44 +import org.thingsboard.server.common.data.relation.EntityRelation;
  45 +import org.thingsboard.server.common.data.relation.RelationTypeGroup;
37 46 import org.thingsboard.server.common.data.security.Authority;
38 47 import org.thingsboard.server.common.data.security.UserCredentials;
39 48 import org.thingsboard.server.dao.customer.CustomerDao;
  49 +import org.thingsboard.server.dao.edge.EdgeService;
40 50 import org.thingsboard.server.dao.entity.AbstractEntityService;
41 51 import org.thingsboard.server.dao.exception.DataValidationException;
42 52 import org.thingsboard.server.dao.exception.IncorrectParameterException;
... ... @@ -45,9 +55,11 @@ import org.thingsboard.server.dao.service.DataValidator;
45 55 import org.thingsboard.server.dao.service.PaginatedRemover;
46 56 import org.thingsboard.server.dao.tenant.TenantDao;
47 57
  58 +import javax.annotation.Nullable;
48 59 import java.util.HashMap;
49 60 import java.util.List;
50 61 import java.util.Map;
  62 +import java.util.concurrent.ExecutionException;
51 63
52 64 import static org.thingsboard.server.dao.service.Validator.validateId;
53 65 import static org.thingsboard.server.dao.service.Validator.validatePageLink;
... ... @@ -85,6 +97,9 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic
85 97 @Autowired
86 98 private CustomerDao customerDao;
87 99
  100 + @Autowired
  101 + private EdgeService edgeService;
  102 +
88 103 @Override
89 104 public User findUserByEmail(TenantId tenantId, String email) {
90 105 log.trace("Executing findUserByEmail [{}]", email);
... ... @@ -312,6 +327,57 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic
312 327 return failedLoginAttempts;
313 328 }
314 329
  330 + @Override
  331 + public User assignUserToEdge(TenantId tenantId, UserId userId, EdgeId edgeId) {
  332 + User user = findUserById(tenantId, userId);
  333 + Edge edge = edgeService.findEdgeById(tenantId, edgeId);
  334 + if (edge == null) {
  335 + throw new DataValidationException("Can't assign user to non-existent edge!");
  336 + }
  337 + if (!edge.getTenantId().getId().equals(user.getTenantId().getId())) {
  338 + throw new DataValidationException("Can't assign user to edge from different tenant!");
  339 + }
  340 + try {
  341 + createRelation(tenantId, new EntityRelation(edgeId, userId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE));
  342 + } catch (ExecutionException | InterruptedException e) {
  343 + log.warn("[{}] Failed to create user relation. Edge Id: [{}]", userId, edgeId);
  344 + throw new RuntimeException(e);
  345 + }
  346 + return user;
  347 + }
  348 +
  349 + @Override
  350 + public User unassignUserFromEdge(TenantId tenantId, UserId userId, EdgeId edgeId) {
  351 + User user = findUserById(tenantId, userId);
  352 + Edge edge = edgeService.findEdgeById(tenantId, edgeId);
  353 + if (edge == null) {
  354 + throw new DataValidationException("Can't unassign user from non-existent edge!");
  355 + }
  356 + try {
  357 + deleteRelation(tenantId, new EntityRelation(edgeId, userId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE));
  358 + } catch (ExecutionException | InterruptedException e) {
  359 + log.warn("[{}] Failed to delete user relation. Edge Id: [{}]", userId, edgeId);
  360 + throw new RuntimeException(e);
  361 + }
  362 + return user;
  363 + }
  364 +
  365 + @Override
  366 + public ListenableFuture<TimePageData<User>> findUsersByTenantIdAndEdgeId(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink) {
  367 + log.trace("Executing findUsersByTenantIdAndEdgeId, tenantId [{}], edgeId [{}], pageLink [{}]", tenantId, edgeId, pageLink);
  368 + validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
  369 + validateId(edgeId, INCORRECT_EDGE_ID + edgeId);
  370 + validatePageLink(pageLink, INCORRECT_PAGE_LINK + pageLink);
  371 + ListenableFuture<List<User>> users = userDao.findUsersByTenantIdAndEdgeId(tenantId.getId(), edgeId.getId(), pageLink);
  372 + return Futures.transform(users, new Function<List<User>, TimePageData<User>>() {
  373 + @Nullable
  374 + @Override
  375 + public TimePageData<User> apply(@Nullable List<User> users) {
  376 + return new TimePageData<>(users, pageLink);
  377 + }
  378 + }, MoreExecutors.directExecutor());
  379 + }
  380 +
315 381 private int increaseFailedLoginAttempts(User user) {
316 382 JsonNode additionalInfo = user.getAdditionalInfo();
317 383 if (!(additionalInfo instanceof ObjectNode)) {
... ...
... ... @@ -331,6 +331,7 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest {
331 331 result = ruleChainService.findDefaultEdgeRuleChainsByTenantId(tenantId).get();
332 332 Assert.assertEquals(1, result.size());
333 333 }
  334 +
334 335 private RuleChainId saveRuleChainAndSetDefaultEdge(String name) {
335 336 RuleChain edgeRuleChain = new RuleChain();
336 337 edgeRuleChain.setTenantId(tenantId);
... ...