Commit a52b88802bf466fd045348067f92ca23d66e4629

Authored by Volodymyr Babak
1 parent b63087d5

Use default rule chain and queue from device profile in edge session

... ... @@ -552,12 +552,6 @@ public final class EdgeGrpcSession implements Closeable {
552 552 case UPDATED:
553 553 DeviceProfile deviceProfile = ctx.getDeviceProfileService().findDeviceProfileById(edgeEvent.getTenantId(), deviceProfileId);
554 554 if (deviceProfile != null) {
555   -
556   - // TODO: voba HACK
557   -// PageData<RuleChain> ruleChainsByTenantIdAndEdgeId = ctx.getRuleChainService().findRuleChainsByTenantIdAndEdgeId(edgeEvent.getTenantId(), edgeEvent.getEdgeId(), new TimePageLink(100));
558   -// RuleChain ruleChain = ruleChainsByTenantIdAndEdgeId.getData().get(0);
559   -// deviceProfile.setDefaultRuleChainId(ruleChain.getId());
560   -
561 555 DeviceProfileUpdateMsg deviceProfileUpdateMsg =
562 556 ctx.getDeviceProfileMsgConstructor().constructDeviceProfileUpdatedMsg(msgType, deviceProfile);
563 557 downlinkMsg = DownlinkMsg.newBuilder()
... ... @@ -916,57 +910,57 @@ public final class EdgeGrpcSession implements Closeable {
916 910 private ListenableFuture<List<Void>> processUplinkMsg(UplinkMsg uplinkMsg) {
917 911 List<ListenableFuture<Void>> result = new ArrayList<>();
918 912 try {
919   - if (uplinkMsg.getEntityDataList() != null && !uplinkMsg.getEntityDataList().isEmpty()) {
  913 + if (uplinkMsg.getEntityDataCount() > 0) {
920 914 for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) {
921 915 result.addAll(ctx.getTelemetryProcessor().onTelemetryUpdate(edge.getTenantId(), entityData));
922 916 }
923 917 }
924   - if (uplinkMsg.getDeviceUpdateMsgList() != null && !uplinkMsg.getDeviceUpdateMsgList().isEmpty()) {
  918 + if (uplinkMsg.getDeviceUpdateMsgCount() > 0) {
925 919 for (DeviceUpdateMsg deviceUpdateMsg : uplinkMsg.getDeviceUpdateMsgList()) {
926 920 result.add(ctx.getDeviceProcessor().onDeviceUpdate(edge.getTenantId(), edge, deviceUpdateMsg));
927 921 }
928 922 }
929   - if (uplinkMsg.getDeviceCredentialsUpdateMsgList() != null && !uplinkMsg.getDeviceCredentialsUpdateMsgList().isEmpty()) {
  923 + if (uplinkMsg.getDeviceCredentialsUpdateMsgCount() > 0) {
930 924 for (DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg : uplinkMsg.getDeviceCredentialsUpdateMsgList()) {
931 925 result.add(ctx.getDeviceProcessor().onDeviceCredentialsUpdate(edge.getTenantId(), deviceCredentialsUpdateMsg));
932 926 }
933 927 }
934   - if (uplinkMsg.getAlarmUpdateMsgList() != null && !uplinkMsg.getAlarmUpdateMsgList().isEmpty()) {
  928 + if (uplinkMsg.getAlarmUpdateMsgCount() > 0) {
935 929 for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdateMsgList()) {
936 930 result.add(ctx.getAlarmProcessor().onAlarmUpdate(edge.getTenantId(), alarmUpdateMsg));
937 931 }
938 932 }
939   - if (uplinkMsg.getRelationUpdateMsgList() != null && !uplinkMsg.getRelationUpdateMsgList().isEmpty()) {
  933 + if (uplinkMsg.getRelationUpdateMsgCount() > 0) {
940 934 for (RelationUpdateMsg relationUpdateMsg : uplinkMsg.getRelationUpdateMsgList()) {
941 935 result.add(ctx.getRelationProcessor().onRelationUpdate(edge.getTenantId(), relationUpdateMsg));
942 936 }
943 937 }
944   - if (uplinkMsg.getRuleChainMetadataRequestMsgList() != null && !uplinkMsg.getRuleChainMetadataRequestMsgList().isEmpty()) {
  938 + if (uplinkMsg.getRuleChainMetadataRequestMsgCount() > 0) {
945 939 for (RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg : uplinkMsg.getRuleChainMetadataRequestMsgList()) {
946 940 result.add(ctx.getSyncEdgeService().processRuleChainMetadataRequestMsg(edge, ruleChainMetadataRequestMsg));
947 941 }
948 942 }
949   - if (uplinkMsg.getAttributesRequestMsgList() != null && !uplinkMsg.getAttributesRequestMsgList().isEmpty()) {
  943 + if (uplinkMsg.getAttributesRequestMsgCount() > 0) {
950 944 for (AttributesRequestMsg attributesRequestMsg : uplinkMsg.getAttributesRequestMsgList()) {
951 945 result.add(ctx.getSyncEdgeService().processAttributesRequestMsg(edge, attributesRequestMsg));
952 946 }
953 947 }
954   - if (uplinkMsg.getRelationRequestMsgList() != null && !uplinkMsg.getRelationRequestMsgList().isEmpty()) {
  948 + if (uplinkMsg.getRelationRequestMsgCount() > 0) {
955 949 for (RelationRequestMsg relationRequestMsg : uplinkMsg.getRelationRequestMsgList()) {
956 950 result.add(ctx.getSyncEdgeService().processRelationRequestMsg(edge, relationRequestMsg));
957 951 }
958 952 }
959   - if (uplinkMsg.getUserCredentialsRequestMsgList() != null && !uplinkMsg.getUserCredentialsRequestMsgList().isEmpty()) {
  953 + if (uplinkMsg.getUserCredentialsRequestMsgCount() > 0) {
960 954 for (UserCredentialsRequestMsg userCredentialsRequestMsg : uplinkMsg.getUserCredentialsRequestMsgList()) {
961 955 result.add(ctx.getSyncEdgeService().processUserCredentialsRequestMsg(edge, userCredentialsRequestMsg));
962 956 }
963 957 }
964   - if (uplinkMsg.getDeviceCredentialsRequestMsgList() != null && !uplinkMsg.getDeviceCredentialsRequestMsgList().isEmpty()) {
  958 + if (uplinkMsg.getDeviceCredentialsRequestMsgCount() > 0) {
965 959 for (DeviceCredentialsRequestMsg deviceCredentialsRequestMsg : uplinkMsg.getDeviceCredentialsRequestMsgList()) {
966 960 result.add(ctx.getSyncEdgeService().processDeviceCredentialsRequestMsg(edge, deviceCredentialsRequestMsg));
967 961 }
968 962 }
969   - if (uplinkMsg.getDeviceRpcCallMsgList() != null && !uplinkMsg.getDeviceRpcCallMsgList().isEmpty()) {
  963 + if (uplinkMsg.getDeviceRpcCallMsgCount() > 0) {
970 964 for (DeviceRpcCallMsg deviceRpcCallMsg : uplinkMsg.getDeviceRpcCallMsgList()) {
971 965 result.add(ctx.getDeviceProcessor().processDeviceRpcCallResponseMsg(edge.getTenantId(), deviceRpcCallMsg));
972 966 }
... ...
... ... @@ -44,13 +44,14 @@ public class DeviceProfileMsgConstructor {
44 44 .setTransportType(deviceProfile.getTransportType().name())
45 45 .setProvisionType(deviceProfile.getProvisionType().name())
46 46 .setProfileDataBytes(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile.getProfileData())));
47   - if (deviceProfile.getDefaultRuleChainId() != null) {
48   - builder.setDefaultRuleChainIdMSB(deviceProfile.getDefaultRuleChainId().getId().getMostSignificantBits())
49   - .setDefaultRuleChainIdLSB(deviceProfile.getDefaultRuleChainId().getId().getLeastSignificantBits());
50   - }
51   - if (deviceProfile.getDefaultQueueName() != null) {
52   - builder.setDefaultQueueName(deviceProfile.getDefaultQueueName());
53   - }
  47 + // TODO: voba - should this be always null at the moment??
  48 +// if (deviceProfile.getDefaultRuleChainId() != null) {
  49 +// builder.setDefaultRuleChainIdMSB(deviceProfile.getDefaultRuleChainId().getId().getMostSignificantBits())
  50 +// .setDefaultRuleChainIdLSB(deviceProfile.getDefaultRuleChainId().getId().getLeastSignificantBits());
  51 +// }
  52 +// if (deviceProfile.getDefaultQueueName() != null) {
  53 +// builder.setDefaultQueueName(deviceProfile.getDefaultQueueName());
  54 +// }
54 55 if (deviceProfile.getProvisionDeviceKey() != null) {
55 56 builder.setProvisionDeviceKey(deviceProfile.getProvisionDeviceKey());
56 57 }
... ...
... ... @@ -42,6 +42,8 @@ import org.thingsboard.server.dao.entityview.EntityViewService;
42 42 import org.thingsboard.server.dao.relation.RelationService;
43 43 import org.thingsboard.server.dao.user.UserService;
44 44 import org.thingsboard.server.service.executors.DbCallbackExecutorService;
  45 +import org.thingsboard.server.service.profile.DefaultTbDeviceProfileCache;
  46 +import org.thingsboard.server.service.profile.TbDeviceProfileCache;
45 47 import org.thingsboard.server.service.queue.TbClusterService;
46 48 import org.thingsboard.server.service.state.DeviceStateService;
47 49
... ... @@ -57,6 +59,9 @@ public abstract class BaseProcessor {
57 59 protected DeviceService deviceService;
58 60
59 61 @Autowired
  62 + protected TbDeviceProfileCache deviceProfileCache;
  63 +
  64 + @Autowired
60 65 protected DashboardService dashboardService;
61 66
62 67 @Autowired
... ...
... ... @@ -21,11 +21,15 @@ import com.google.common.util.concurrent.ListenableFuture;
21 21 import com.google.common.util.concurrent.SettableFuture;
22 22 import com.google.gson.Gson;
23 23 import com.google.gson.JsonObject;
  24 +import groovy.lang.Tuple;
24 25 import lombok.extern.slf4j.Slf4j;
  26 +import org.javatuples.Pair;
  27 +import org.passay.Rule;
25 28 import org.springframework.stereotype.Component;
26 29 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
27 30 import org.thingsboard.server.common.data.DataConstants;
28 31 import org.thingsboard.server.common.data.Device;
  32 +import org.thingsboard.server.common.data.DeviceProfile;
29 33 import org.thingsboard.server.common.data.EntityType;
30 34 import org.thingsboard.server.common.data.EntityView;
31 35 import org.thingsboard.server.common.data.asset.Asset;
... ... @@ -33,14 +37,18 @@ import org.thingsboard.server.common.data.id.AssetId;
33 37 import org.thingsboard.server.common.data.id.CustomerId;
34 38 import org.thingsboard.server.common.data.id.DashboardId;
35 39 import org.thingsboard.server.common.data.id.DeviceId;
  40 +import org.thingsboard.server.common.data.id.DeviceProfileId;
36 41 import org.thingsboard.server.common.data.id.EntityId;
37 42 import org.thingsboard.server.common.data.id.EntityViewId;
  43 +import org.thingsboard.server.common.data.id.RuleChainId;
38 44 import org.thingsboard.server.common.data.id.TenantId;
39 45 import org.thingsboard.server.common.data.id.UserId;
40 46 import org.thingsboard.server.common.data.kv.AttributeKey;
41 47 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  48 +import org.thingsboard.server.common.data.rule.RuleChain;
42 49 import org.thingsboard.server.common.msg.TbMsg;
43 50 import org.thingsboard.server.common.msg.TbMsgMetaData;
  51 +import org.thingsboard.server.common.msg.queue.ServiceQueue;
44 52 import org.thingsboard.server.common.msg.session.SessionMsgType;
45 53 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
46 54 import org.thingsboard.server.common.transport.util.JsonUtils;
... ... @@ -70,7 +78,9 @@ public class TelemetryProcessor extends BaseProcessor {
70 78 List<ListenableFuture<Void>> result = new ArrayList<>();
71 79 EntityId entityId = constructEntityId(entityData);
72 80 if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) {
73   - TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId);
  81 + // TODO: voba - in terms of performance we should not fetch device from DB by id
  82 + // TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId);
  83 + TbMsgMetaData metaData = new TbMsgMetaData();
74 84 metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE);
75 85 if (entityData.hasPostAttributesMsg()) {
76 86 result.add(processPostAttributes(tenantId, entityId, entityData.getPostAttributesMsg(), metaData));
... ... @@ -120,12 +130,36 @@ public class TelemetryProcessor extends BaseProcessor {
120 130 return metaData;
121 131 }
122 132
  133 + private Pair<String, RuleChainId> getDefaultQueueNameAndRuleChainId(TenantId tenantId, EntityId entityId) {
  134 + if (EntityType.DEVICE.equals(entityId.getEntityType())) {
  135 + DeviceProfile deviceProfile = deviceProfileCache.get(tenantId, new DeviceId(entityId.getId()));
  136 + RuleChainId ruleChainId;
  137 + String queueName;
  138 +
  139 + if (deviceProfile == null) {
  140 + log.warn("[{}] Device profile is null!", entityId);
  141 + ruleChainId = null;
  142 + queueName = ServiceQueue.MAIN;
  143 + } else {
  144 + ruleChainId = deviceProfile.getDefaultRuleChainId();
  145 + String defaultQueueName = deviceProfile.getDefaultQueueName();
  146 + queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN;
  147 + }
  148 + return new Pair<>(queueName, ruleChainId);
  149 + } else {
  150 + return new Pair<>(ServiceQueue.MAIN, null);
  151 + }
  152 + }
  153 +
123 154 private ListenableFuture<Void> processPostTelemetry(TenantId tenantId, EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) {
124 155 SettableFuture<Void> futureToSet = SettableFuture.create();
125 156 for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
126 157 JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList());
127 158 metaData.putValue("ts", tsKv.getTs() + "");
128   - TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, metaData, gson.toJson(json));
  159 + Pair<String, RuleChainId> defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
  160 + String queueName = defaultQueueAndRuleChain.getValue0();
  161 + RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue1();
  162 + TbMsg tbMsg = TbMsg.newMsg(queueName, SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, metaData, gson.toJson(json), ruleChainId, null);
129 163 tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
130 164 @Override
131 165 public void onSuccess(TbQueueMsgMetadata metadata) {
... ... @@ -145,7 +179,10 @@ public class TelemetryProcessor extends BaseProcessor {
145 179 private ListenableFuture<Void> processPostAttributes(TenantId tenantId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) {
146 180 SettableFuture<Void> futureToSet = SettableFuture.create();
147 181 JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
148   - TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, metaData, gson.toJson(json));
  182 + Pair<String, RuleChainId> defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
  183 + String queueName = defaultQueueAndRuleChain.getValue0();
  184 + RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue1();
  185 + TbMsg tbMsg = TbMsg.newMsg(queueName, SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, metaData, gson.toJson(json), ruleChainId, null);
149 186 tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
150 187 @Override
151 188 public void onSuccess(TbQueueMsgMetadata metadata) {
... ... @@ -169,7 +206,10 @@ public class TelemetryProcessor extends BaseProcessor {
169 206 Futures.addCallback(future, new FutureCallback<List<Void>>() {
170 207 @Override
171 208 public void onSuccess(@Nullable List<Void> voids) {
172   - TbMsg tbMsg = TbMsg.newMsg(DataConstants.ATTRIBUTES_UPDATED, entityId, metaData, gson.toJson(json));
  209 + Pair<String, RuleChainId> defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
  210 + String queueName = defaultQueueAndRuleChain.getValue0();
  211 + RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue1();
  212 + TbMsg tbMsg = TbMsg.newMsg(queueName, DataConstants.ATTRIBUTES_UPDATED, entityId, metaData, gson.toJson(json), ruleChainId, null);
173 213 tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
174 214 @Override
175 215 public void onSuccess(TbQueueMsgMetadata metadata) {
... ...