Commit bf5f90d2712b2d01553a3829150abfb2079cd3cb

Authored by Volodymyr Babak
1 parent b2f4a082

Added support for user update. Push notification to rule engine in case device was created

... ... @@ -23,12 +23,14 @@ import lombok.extern.slf4j.Slf4j;
23 23 import org.springframework.beans.factory.annotation.Autowired;
24 24 import org.springframework.stereotype.Service;
25 25 import org.thingsboard.server.common.data.EntityType;
  26 +import org.thingsboard.server.common.data.User;
26 27 import org.thingsboard.server.common.data.alarm.Alarm;
27 28 import org.thingsboard.server.common.data.audit.ActionType;
28 29 import org.thingsboard.server.common.data.edge.Edge;
29 30 import org.thingsboard.server.common.data.edge.EdgeEvent;
30 31 import org.thingsboard.server.common.data.edge.EdgeEventType;
31 32 import org.thingsboard.server.common.data.id.AlarmId;
  33 +import org.thingsboard.server.common.data.id.CustomerId;
32 34 import org.thingsboard.server.common.data.id.DashboardId;
33 35 import org.thingsboard.server.common.data.id.EdgeId;
34 36 import org.thingsboard.server.common.data.id.EntityId;
... ... @@ -36,6 +38,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
36 38 import org.thingsboard.server.common.data.id.IdBased;
37 39 import org.thingsboard.server.common.data.id.RuleChainId;
38 40 import org.thingsboard.server.common.data.id.TenantId;
  41 +import org.thingsboard.server.common.data.id.UserId;
39 42 import org.thingsboard.server.common.data.page.TextPageData;
40 43 import org.thingsboard.server.common.data.page.TextPageLink;
41 44 import org.thingsboard.server.common.data.page.TimePageData;
... ... @@ -46,7 +49,9 @@ import org.thingsboard.server.common.msg.queue.TbCallback;
46 49 import org.thingsboard.server.dao.alarm.AlarmService;
47 50 import org.thingsboard.server.dao.edge.EdgeEventService;
48 51 import org.thingsboard.server.dao.edge.EdgeService;
  52 +import org.thingsboard.server.dao.model.ModelConstants;
49 53 import org.thingsboard.server.dao.relation.RelationService;
  54 +import org.thingsboard.server.dao.user.UserService;
50 55 import org.thingsboard.server.gen.transport.TransportProtos;
51 56 import org.thingsboard.server.queue.util.TbCoreComponent;
52 57 import org.thingsboard.server.service.executors.DbCallbackExecutorService;
... ... @@ -78,6 +83,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
78 83 private AlarmService alarmService;
79 84
80 85 @Autowired
  86 + private UserService userService;
  87 +
  88 + @Autowired
81 89 private RelationService relationService;
82 90
83 91 @Autowired
... ... @@ -142,6 +150,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
142 150 switch (edgeEventType) {
143 151 // TODO: voba - handle edge updates
144 152 // case EDGE:
  153 + case USER:
145 154 case ASSET:
146 155 case DEVICE:
147 156 case ENTITY_VIEW:
... ... @@ -286,6 +295,15 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
286 295 return convertToEdgeIds(edgeService.findEdgesByTenantIdAndDashboardId(tenantId, new DashboardId(entityId.getId())));
287 296 case RULE_CHAIN:
288 297 return convertToEdgeIds(edgeService.findEdgesByTenantIdAndRuleChainId(tenantId, new RuleChainId(entityId.getId())));
  298 + case USER:
  299 + User userById = userService.findUserById(tenantId, new UserId(entityId.getId()));
  300 + TextPageData<Edge> edges;
  301 + if (userById.getCustomerId() == null || userById.getCustomerId().getId().equals(ModelConstants.NULL_UUID)) {
  302 + edges = edgeService.findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE));
  303 + } else {
  304 + edges = edgeService.findEdgesByTenantIdAndCustomerId(tenantId, new CustomerId(entityId.getId()), new TextPageLink(Integer.MAX_VALUE));
  305 + }
  306 + return convertToEdgeIds(Futures.immediateFuture(edges.getData()));
289 307 default:
290 308 return Futures.immediateFuture(Collections.emptyList());
291 309 }
... ...
... ... @@ -33,6 +33,9 @@ import org.thingsboard.server.dao.entityview.EntityViewService;
33 33 import org.thingsboard.server.dao.relation.RelationService;
34 34 import org.thingsboard.server.dao.rule.RuleChainService;
35 35 import org.thingsboard.server.dao.user.UserService;
  36 +import org.thingsboard.server.queue.discovery.PartitionService;
  37 +import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
  38 +import org.thingsboard.server.queue.util.TbCoreComponent;
36 39 import org.thingsboard.server.service.edge.rpc.EdgeEventStorageSettings;
37 40 import org.thingsboard.server.service.edge.rpc.constructor.AlarmUpdateMsgConstructor;
38 41 import org.thingsboard.server.service.edge.rpc.constructor.AssetUpdateMsgConstructor;
... ... @@ -49,6 +52,7 @@ import org.thingsboard.server.service.queue.TbClusterService;
49 52 import org.thingsboard.server.service.state.DeviceStateService;
50 53
51 54 @Component
  55 +@TbCoreComponent
52 56 @Data
53 57 public class EdgeContextComponent {
54 58
... ... @@ -56,6 +60,13 @@ public class EdgeContextComponent {
56 60 @Autowired
57 61 private EdgeService edgeService;
58 62
  63 + @Autowired
  64 + private PartitionService partitionService;
  65 +
  66 + @Autowired
  67 + @Lazy
  68 + private TbQueueProducerProvider producerProvider;
  69 +
59 70 @Lazy
60 71 @Autowired
61 72 private EdgeNotificationService edgeNotificationService;
... ...
... ... @@ -18,10 +18,10 @@ package org.thingsboard.server.service.edge.rpc;
18 18 import com.datastax.driver.core.utils.UUIDs;
19 19 import com.fasterxml.jackson.core.JsonProcessingException;
20 20 import com.fasterxml.jackson.databind.ObjectMapper;
  21 +import com.fasterxml.jackson.databind.node.ObjectNode;
21 22 import com.google.common.util.concurrent.FutureCallback;
22 23 import com.google.common.util.concurrent.Futures;
23 24 import com.google.common.util.concurrent.ListenableFuture;
24   -import com.google.common.util.concurrent.MoreExecutors;
25 25 import com.google.gson.Gson;
26 26 import com.google.gson.JsonElement;
27 27 import com.google.gson.JsonObject;
... ... @@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.edge.Edge;
44 44 import org.thingsboard.server.common.data.edge.EdgeEvent;
45 45 import org.thingsboard.server.common.data.id.AlarmId;
46 46 import org.thingsboard.server.common.data.id.AssetId;
  47 +import org.thingsboard.server.common.data.id.CustomerId;
47 48 import org.thingsboard.server.common.data.id.DashboardId;
48 49 import org.thingsboard.server.common.data.id.DeviceId;
49 50 import org.thingsboard.server.common.data.id.EdgeId;
... ... @@ -66,6 +67,8 @@ import org.thingsboard.server.common.data.security.DeviceCredentialsType;
66 67 import org.thingsboard.server.common.data.security.UserCredentials;
67 68 import org.thingsboard.server.common.msg.TbMsg;
68 69 import org.thingsboard.server.common.msg.TbMsgMetaData;
  70 +import org.thingsboard.server.common.msg.queue.ServiceType;
  71 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
69 72 import org.thingsboard.server.common.msg.session.SessionMsgType;
70 73 import org.thingsboard.server.common.transport.util.JsonUtils;
71 74 import org.thingsboard.server.gen.edge.AlarmUpdateMsg;
... ... @@ -96,6 +99,10 @@ import org.thingsboard.server.gen.edge.UplinkResponseMsg;
96 99 import org.thingsboard.server.gen.edge.UserCredentialsRequestMsg;
97 100 import org.thingsboard.server.gen.edge.UserCredentialsUpdateMsg;
98 101 import org.thingsboard.server.gen.transport.TransportProtos;
  102 +import org.thingsboard.server.queue.TbQueueCallback;
  103 +import org.thingsboard.server.queue.TbQueueMsgMetadata;
  104 +import org.thingsboard.server.queue.TbQueueProducer;
  105 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
99 106 import org.thingsboard.server.service.edge.EdgeContextComponent;
100 107
101 108 import java.io.Closeable;
... ... @@ -132,6 +139,8 @@ public final class EdgeGrpcSession implements Closeable {
132 139 private StreamObserver<ResponseMsg> outputStream;
133 140 private boolean connected;
134 141
  142 + private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> ruleEngineMsgProducer;
  143 +
135 144 EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream, BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener,
136 145 Consumer<EdgeId> sessionCloseListener, ObjectMapper mapper) {
137 146 this.sessionId = UUID.randomUUID();
... ... @@ -140,6 +149,7 @@ public final class EdgeGrpcSession implements Closeable {
140 149 this.sessionOpenListener = sessionOpenListener;
141 150 this.sessionCloseListener = sessionCloseListener;
142 151 this.mapper = mapper;
  152 + this.ruleEngineMsgProducer = ctx.getProducerProvider().getRuleEngineMsgProducer();
143 153 initInputStream();
144 154 }
145 155
... ... @@ -767,17 +777,19 @@ public final class EdgeGrpcSession implements Closeable {
767 777 Device deviceById = ctx.getDeviceService().findDeviceById(edge.getTenantId(), edgeDeviceId);
768 778 if (deviceById != null) {
769 779 // this ID already used by other device - create new device and update ID on the edge
770   - Device savedDevice = createDevice(deviceUpdateMsg);
  780 + device = createDevice(deviceUpdateMsg);
771 781 EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
772   - .setDeviceUpdateMsg(ctx.getDeviceUpdateMsgConstructor().constructDeviceUpdatedMsg(UpdateMsgType.DEVICE_CONFLICT_RPC_MESSAGE, savedDevice))
  782 + .setDeviceUpdateMsg(ctx.getDeviceUpdateMsgConstructor().constructDeviceUpdatedMsg(UpdateMsgType.DEVICE_CONFLICT_RPC_MESSAGE, device))
773 783 .build();
774 784 outputStream.onNext(ResponseMsg.newBuilder()
775 785 .setEntityUpdateMsg(entityUpdateMsg)
776 786 .build());
777 787 } else {
778   - createDevice(deviceUpdateMsg);
  788 + device = createDevice(deviceUpdateMsg);
779 789 }
780 790 }
  791 + // TODO: voba - assign device only in case device is not assigned yet. Missing functionality to check this relation prior assignment
  792 + ctx.getDeviceService().assignDeviceToEdge(edge.getTenantId(), device.getId(), edge.getId());
781 793 break;
782 794 case ENTITY_UPDATED_RPC_MESSAGE:
783 795 updateDevice(deviceUpdateMsg);
... ... @@ -866,11 +878,9 @@ public final class EdgeGrpcSession implements Closeable {
866 878 device.setType(deviceUpdateMsg.getType());
867 879 device.setLabel(deviceUpdateMsg.getLabel());
868 880 device = ctx.getDeviceService().saveDevice(device);
869   - device = ctx.getDeviceService().assignDeviceToEdge(edge.getTenantId(), device.getId(), edge.getId());
870 881 createRelationFromEdge(device.getId());
871   - ctx.getRelationService().saveRelationAsync(TenantId.SYS_TENANT_ID, new EntityRelation(edge.getId(), device.getId(), "Created"));
872 882 ctx.getDeviceStateService().onDeviceAdded(device);
873   -
  883 + pushDeviceCreatedEventToRuleEngine(device);
874 884 requestDeviceCredentialsFromEdge(device);
875 885 } finally {
876 886 deviceCreationLock.unlock();
... ... @@ -878,6 +888,49 @@ public final class EdgeGrpcSession implements Closeable {
878 888 return device;
879 889 }
880 890
  891 + private void pushDeviceCreatedEventToRuleEngine(Device device) {
  892 + try {
  893 + ObjectNode entityNode = mapper.valueToTree(device);
  894 + TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, device.getId(), getActionTbMsgMetaData(device.getCustomerId()), mapper.writeValueAsString(entityNode));
  895 + sendToRuleEngine(edge.getTenantId(), tbMsg, new TbQueueCallback() {
  896 + @Override
  897 + public void onSuccess(TbQueueMsgMetadata metadata) {
  898 + // TODO: voba - handle success
  899 + }
  900 +
  901 + @Override
  902 + public void onFailure(Throwable t) {
  903 + // TODO: voba - handle failure
  904 + }
  905 + });
  906 + } catch (JsonProcessingException | IllegalArgumentException e) {
  907 + log.warn("[{}] Failed to push device action to rule engine: {}", device.getId(), DataConstants.ENTITY_CREATED, e);
  908 + }
  909 + }
  910 +
  911 + protected void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
  912 + TopicPartitionInfo tpi = ctx.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, tenantId, tbMsg.getOriginator());
  913 + TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg))
  914 + .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
  915 + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
  916 + ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);
  917 + }
  918 +
  919 + private TbMsgMetaData getActionTbMsgMetaData(CustomerId customerId) {
  920 + TbMsgMetaData metaData = getTbMsgMetaData(edge);
  921 + if (customerId != null && !customerId.isNullUid()) {
  922 + metaData.putValue("customerId", customerId.toString());
  923 + }
  924 + return metaData;
  925 + }
  926 +
  927 + private TbMsgMetaData getTbMsgMetaData(Edge edge) {
  928 + TbMsgMetaData metaData = new TbMsgMetaData();
  929 + metaData.putValue("edgeId", edge.getId().toString());
  930 + metaData.putValue("edgeName", edge.getName());
  931 + return metaData;
  932 + }
  933 +
881 934 private EntityId getAlarmOriginator(String entityName, org.thingsboard.server.common.data.EntityType entityType) {
882 935 switch (entityType) {
883 936 case DEVICE:
... ...