Commit f29d15d8b7dab6061d7c306defca36827dd41cee

Authored by Volodymyr Babak
1 parent bf21ff3c

Add/delete users to edge on assign/unassign to/from customer

... ... @@ -718,6 +718,14 @@ public abstract class BaseController {
718 718 return result;
719 719 }
720 720
  721 + protected void sendNotificationMsgToEdgeService(TenantId tenantId, EdgeId edgeId, CustomerId customerId, ActionType edgeEventAction) {
  722 + try {
  723 + sendNotificationMsgToEdgeService(tenantId, edgeId, null, json.writeValueAsString(customerId), EdgeEventType.EDGE, edgeEventAction);
  724 + } catch (Exception e) {
  725 + log.warn("Failed to push assign/unassign to/from customer to core: {}", customerId, e);
  726 + }
  727 + }
  728 +
721 729 protected void sendNotificationMsgToEdgeService(TenantId tenantId, EntityRelation relation, ActionType edgeEventAction) {
722 730 try {
723 731 if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) &&
... ...
... ... @@ -178,6 +178,9 @@ public class EdgeController extends BaseController {
178 178 savedEdge.getCustomerId(),
179 179 ActionType.ASSIGNED_TO_CUSTOMER, null, strEdgeId, strCustomerId, customer.getName());
180 180
  181 + sendNotificationMsgToEdgeService(savedEdge.getTenantId(), savedEdge.getId(),
  182 + customerId, ActionType.ASSIGNED_TO_CUSTOMER);
  183 +
181 184 return savedEdge;
182 185 } catch (Exception e) {
183 186 logEntityAction(emptyId(EntityType.EDGE), null,
... ... @@ -206,6 +209,9 @@ public class EdgeController extends BaseController {
206 209 edge.getCustomerId(),
207 210 ActionType.UNASSIGNED_FROM_CUSTOMER, null, strEdgeId, customer.getId().toString(), customer.getName());
208 211
  212 + sendNotificationMsgToEdgeService(savedEdge.getTenantId(), savedEdge.getId(),
  213 + edge.getCustomerId(), ActionType.UNASSIGNED_FROM_CUSTOMER);
  214 +
209 215 return savedEdge;
210 216 } catch (Exception e) {
211 217 logEntityAction(emptyId(EntityType.EDGE), null,
... ...
... ... @@ -157,8 +157,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
157 157 TenantId tenantId = new TenantId(new UUID(edgeNotificationMsg.getTenantIdMSB(), edgeNotificationMsg.getTenantIdLSB()));
158 158 EdgeEventType edgeEventType = EdgeEventType.valueOf(edgeNotificationMsg.getEdgeEventType());
159 159 switch (edgeEventType) {
160   - // TODO: voba - handle edge updates
161   - // case EDGE:
  160 + case EDGE:
  161 + processEdge(tenantId, edgeNotificationMsg);
  162 + break;
162 163 case USER:
163 164 case ASSET:
164 165 case DEVICE:
... ... @@ -186,6 +187,44 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
186 187 }
187 188 }
188 189
  190 + private void processEdge(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
  191 + // TODO: voba - handle edge updates
  192 + try {
  193 + ActionType edgeEventActionType = ActionType.valueOf(edgeNotificationMsg.getEdgeEventAction());
  194 + EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB()));
  195 + switch (edgeEventActionType) {
  196 + case ASSIGNED_TO_CUSTOMER:
  197 + case UNASSIGNED_FROM_CUSTOMER:
  198 + CustomerId customerId = mapper.readValue(edgeNotificationMsg.getEntityBody(), CustomerId.class);
  199 + ListenableFuture<Edge> edgeFuture = edgeService.findEdgeByIdAsync(tenantId, edgeId);
  200 + Futures.addCallback(edgeFuture, new FutureCallback<Edge>() {
  201 + @Override
  202 + public void onSuccess(@Nullable Edge edge) {
  203 + if (edge != null && customerId != null && !EntityId.NULL_UUID.equals(customerId.getId())) {
  204 + ActionType actionType = ActionType.ASSIGNED_TO_CUSTOMER.equals(edgeEventActionType) ? ActionType.ADDED : ActionType.DELETED;
  205 + saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, actionType, customerId, null);
  206 + TextPageData<User> pageData = userService.findCustomerUsers(tenantId, customerId, new TextPageLink(Integer.MAX_VALUE));
  207 + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
  208 + log.trace("[{}] [{}] user(s) are going to be {} to edge.", edge.getId(), pageData.getData().size(), actionType.name());
  209 + for (User user : pageData.getData()) {
  210 + saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, actionType, user.getId(), null);
  211 + }
  212 + }
  213 + }
  214 + }
  215 +
  216 + @Override
  217 + public void onFailure(Throwable t) {
  218 + log.error("Can't find edge by id [{}]", edgeNotificationMsg, t);
  219 + }
  220 + }, dbCallbackExecutorService);
  221 + break;
  222 + }
  223 + } catch (Exception e) {
  224 + log.error("Exception during processing edge event", e);
  225 + }
  226 + }
  227 +
189 228 private void processEntity(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
190 229 ActionType edgeEventActionType = ActionType.valueOf(edgeNotificationMsg.getEdgeEventAction());
191 230 EdgeEventType edgeEventType = EdgeEventType.valueOf(edgeNotificationMsg.getEdgeEventType());
... ...
... ... @@ -41,6 +41,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
41 41 import org.thingsboard.server.service.edge.rpc.EdgeEventStorageSettings;
42 42 import org.thingsboard.server.service.edge.rpc.constructor.AlarmUpdateMsgConstructor;
43 43 import org.thingsboard.server.service.edge.rpc.constructor.AssetUpdateMsgConstructor;
  44 +import org.thingsboard.server.service.edge.rpc.constructor.CustomerUpdateMsgConstructor;
44 45 import org.thingsboard.server.service.edge.rpc.constructor.DashboardUpdateMsgConstructor;
45 46 import org.thingsboard.server.service.edge.rpc.constructor.DeviceUpdateMsgConstructor;
46 47 import org.thingsboard.server.service.edge.rpc.constructor.EntityDataMsgConstructor;
... ... @@ -169,6 +170,10 @@ public class EdgeContextComponent {
169 170
170 171 @Lazy
171 172 @Autowired
  173 + private CustomerUpdateMsgConstructor customerUpdateMsgConstructor;
  174 +
  175 + @Lazy
  176 + @Autowired
172 177 private UserUpdateMsgConstructor userUpdateMsgConstructor;
173 178
174 179 @Lazy
... ...
... ... @@ -30,6 +30,7 @@ import lombok.Data;
30 30 import lombok.extern.slf4j.Slf4j;
31 31 import org.apache.commons.lang.RandomStringUtils;
32 32 import org.checkerframework.checker.nullness.qual.Nullable;
  33 +import org.thingsboard.server.common.data.Customer;
33 34 import org.thingsboard.server.common.data.Dashboard;
34 35 import org.thingsboard.server.common.data.DataConstants;
35 36 import org.thingsboard.server.common.data.Device;
... ... @@ -82,6 +83,7 @@ import org.thingsboard.server.gen.edge.AttributesRequestMsg;
82 83 import org.thingsboard.server.gen.edge.ConnectRequestMsg;
83 84 import org.thingsboard.server.gen.edge.ConnectResponseCode;
84 85 import org.thingsboard.server.gen.edge.ConnectResponseMsg;
  86 +import org.thingsboard.server.gen.edge.CustomerUpdateMsg;
85 87 import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
86 88 import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg;
87 89 import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg;
... ... @@ -348,6 +350,9 @@ public final class EdgeGrpcSession implements Closeable {
348 350 case DASHBOARD:
349 351 processDashboard(edgeEvent, msgType, edgeEventAction);
350 352 break;
  353 + case CUSTOMER:
  354 + processCustomer(edgeEvent, msgType, edgeEventAction);
  355 + break;
351 356 case RULE_CHAIN:
352 357 processRuleChain(edgeEvent, msgType, edgeEventAction);
353 358 break;
... ... @@ -510,6 +515,36 @@ public final class EdgeGrpcSession implements Closeable {
510 515 }
511 516 }
512 517
  518 + private void processCustomer(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeEventAction) {
  519 + CustomerId customerId = new CustomerId(edgeEvent.getEntityId());
  520 + EntityUpdateMsg entityUpdateMsg = null;
  521 + switch (edgeEventAction) {
  522 + case ADDED:
  523 + case UPDATED:
  524 + Customer customer = ctx.getCustomerService().findCustomerById(edgeEvent.getTenantId(), customerId);
  525 + if (customer != null) {
  526 + CustomerUpdateMsg customerUpdateMsg =
  527 + ctx.getCustomerUpdateMsgConstructor().constructCustomerUpdatedMsg(msgType, customer);
  528 + entityUpdateMsg = EntityUpdateMsg.newBuilder()
  529 + .setCustomerUpdateMsg(customerUpdateMsg)
  530 + .build();
  531 + }
  532 + break;
  533 + case DELETED:
  534 + CustomerUpdateMsg customerUpdateMsg =
  535 + ctx.getCustomerUpdateMsgConstructor().constructCustomerDeleteMsg(customerId);
  536 + entityUpdateMsg = EntityUpdateMsg.newBuilder()
  537 + .setCustomerUpdateMsg(customerUpdateMsg)
  538 + .build();
  539 + break;
  540 + }
  541 + if (entityUpdateMsg != null) {
  542 + outputStream.onNext(ResponseMsg.newBuilder()
  543 + .setEntityUpdateMsg(entityUpdateMsg)
  544 + .build());
  545 + }
  546 + }
  547 +
513 548 private void processRuleChain(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeEventAction) {
514 549 RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId());
515 550 EntityUpdateMsg entityUpdateMsg = null;
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.edge.rpc.constructor;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.data.Customer;
  21 +import org.thingsboard.server.common.data.id.CustomerId;
  22 +import org.thingsboard.server.dao.util.mapping.JacksonUtil;
  23 +import org.thingsboard.server.gen.edge.CustomerUpdateMsg;
  24 +import org.thingsboard.server.gen.edge.UpdateMsgType;
  25 +
  26 +@Component
  27 +@Slf4j
  28 +public class CustomerUpdateMsgConstructor {
  29 +
  30 + public CustomerUpdateMsg constructCustomerUpdatedMsg(UpdateMsgType msgType, Customer customer) {
  31 + CustomerUpdateMsg.Builder builder = CustomerUpdateMsg.newBuilder()
  32 + .setMsgType(msgType)
  33 + .setIdMSB(customer.getId().getId().getMostSignificantBits())
  34 + .setIdLSB(customer.getId().getId().getLeastSignificantBits())
  35 + .setTitle(customer.getTitle());
  36 + if (customer.getCountry() != null) {
  37 + builder.setCountry(customer.getCountry());
  38 + }
  39 + if (customer.getState() != null) {
  40 + builder.setState(customer.getState());
  41 + }
  42 + if (customer.getCity() != null) {
  43 + builder.setCity(customer.getCity());
  44 + }
  45 + if (customer.getAddress() != null) {
  46 + builder.setAddress(customer.getAddress());
  47 + }
  48 + if (customer.getAddress2() != null) {
  49 + builder.setAddress2(customer.getAddress2());
  50 + }
  51 + if (customer.getZip() != null) {
  52 + builder.setZip(customer.getZip());
  53 + }
  54 + if (customer.getPhone() != null) {
  55 + builder.setPhone(customer.getPhone());
  56 + }
  57 + if (customer.getEmail() != null) {
  58 + builder.setEmail(customer.getEmail());
  59 + }
  60 + if (customer.getAdditionalInfo() != null) {
  61 + builder.setAdditionalInfo(JacksonUtil.toString(customer.getAdditionalInfo()));
  62 + }
  63 + return builder.build();
  64 + }
  65 +
  66 + public CustomerUpdateMsg constructCustomerDeleteMsg(CustomerId customerId) {
  67 + return CustomerUpdateMsg.newBuilder()
  68 + .setMsgType(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE)
  69 + .setIdMSB(customerId.getId().getMostSignificantBits())
  70 + .setIdLSB(customerId.getId().getLeastSignificantBits()).build();
  71 + }
  72 +}
... ...
... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc.constructor;
18 18 import com.google.gson.Gson;
19 19 import com.google.gson.JsonArray;
20 20 import com.google.gson.JsonElement;
  21 +import com.google.gson.JsonNull;
21 22 import com.google.gson.JsonObject;
22 23 import lombok.extern.slf4j.Slf4j;
23 24 import org.springframework.stereotype.Component;
... ... @@ -42,7 +43,10 @@ public class EntityDataMsgConstructor {
42 43 case TIMESERIES_UPDATED:
43 44 try {
44 45 JsonObject data = entityData.getAsJsonObject();
45   - long ts = data.getAsJsonPrimitive("ts").getAsLong();
  46 + long ts = System.currentTimeMillis();
  47 + if (data.get("ts") != null && !data.get("ts").isJsonNull()) {
  48 + ts = data.getAsJsonObject("ts").getAsLong();
  49 + }
46 50 builder.setPostTelemetryMsg(JsonConverter.convertToTelemetryProto(data.getAsJsonObject("data"), ts));
47 51 } catch (Exception e) {
48 52 log.warn("Can't convert to telemetry proto, entityData [{}]", entityData, e);
... ...
... ... @@ -36,6 +36,10 @@ public class UserUpdateMsgConstructor {
36 36 .setIdLSB(user.getId().getId().getLeastSignificantBits())
37 37 .setEmail(user.getEmail())
38 38 .setAuthority(user.getAuthority().name());
  39 + if (user.getCustomerId() != null) {
  40 + builder.setCustomerIdMSB(user.getCustomerId().getId().getMostSignificantBits());
  41 + builder.setCustomerIdLSB(user.getCustomerId().getId().getLeastSignificantBits());
  42 + }
39 43 if (user.getFirstName() != null) {
40 44 builder.setFirstName(user.getFirstName());
41 45 }
... ...
... ... @@ -264,6 +264,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
264 264 TextPageData<User> pageData = userService.findTenantAdmins(edge.getTenantId(), new TextPageLink(Integer.MAX_VALUE));
265 265 pushUsersToEdge(pageData, edge);
266 266 if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) {
  267 + saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, ActionType.ADDED, edge.getCustomerId(), null);
267 268 pageData = userService.findCustomerUsers(edge.getTenantId(), edge.getCustomerId(), new TextPageLink(Integer.MAX_VALUE));
268 269 pushUsersToEdge(pageData, edge);
269 270 }
... ...
... ... @@ -261,11 +261,13 @@ message UserUpdateMsg {
261 261 UpdateMsgType msgType = 1;
262 262 int64 idMSB = 2;
263 263 int64 idLSB = 3;
264   - string email = 4;
265   - string authority = 5;
266   - string firstName = 6;
267   - string lastName = 7;
268   - string additionalInfo = 8;
  264 + int64 customerIdMSB = 4;
  265 + int64 customerIdLSB = 5;
  266 + string email = 6;
  267 + string authority = 7;
  268 + string firstName = 8;
  269 + string lastName = 9;
  270 + string additionalInfo = 10;
269 271 }
270 272
271 273 message WidgetsBundleUpdateMsg {
... ...