Commit 22755e87637d8f49aa3bf661a894ab7e31b0b30d
1 parent
f39d6638
Device Profile implementation for Transport
Showing
42 changed files
with
557 additions
and
88 deletions
... | ... | @@ -47,10 +47,6 @@ |
47 | 47 | |
48 | 48 | <dependencies> |
49 | 49 | <dependency> |
50 | - <groupId>de.ruedigermoeller</groupId> | |
51 | - <artifactId>fst</artifactId> | |
52 | - </dependency> | |
53 | - <dependency> | |
54 | 50 | <groupId>io.netty</groupId> |
55 | 51 | <artifactId>netty-transport-native-epoll</artifactId> |
56 | 52 | <version>${netty.version}</version> | ... | ... |
... | ... | @@ -44,7 +44,6 @@ import org.thingsboard.server.common.msg.TbMsg; |
44 | 44 | import org.thingsboard.server.common.msg.queue.ServiceType; |
45 | 45 | import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
46 | 46 | import org.thingsboard.server.common.msg.tools.TbRateLimits; |
47 | -import org.thingsboard.server.dao.alarm.AlarmService; | |
48 | 47 | import org.thingsboard.server.dao.asset.AssetService; |
49 | 48 | import org.thingsboard.server.dao.attributes.AttributesService; |
50 | 49 | import org.thingsboard.server.dao.audit.AuditLogService; |
... | ... | @@ -65,7 +64,7 @@ import org.thingsboard.server.dao.user.UserService; |
65 | 64 | import org.thingsboard.server.queue.discovery.PartitionService; |
66 | 65 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
67 | 66 | import org.thingsboard.server.service.component.ComponentDiscoveryService; |
68 | -import org.thingsboard.server.service.encoding.DataDecodingEncodingService; | |
67 | +import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; | |
69 | 68 | import org.thingsboard.server.service.executors.DbCallbackExecutorService; |
70 | 69 | import org.thingsboard.server.service.executors.ExternalCallExecutorService; |
71 | 70 | import org.thingsboard.server.service.executors.SharedEventLoopGroupService; |
... | ... | @@ -90,7 +89,6 @@ import java.util.concurrent.ConcurrentHashMap; |
90 | 89 | import java.util.concurrent.ConcurrentMap; |
91 | 90 | import java.util.concurrent.ScheduledExecutorService; |
92 | 91 | import java.util.concurrent.TimeUnit; |
93 | -import java.util.concurrent.atomic.AtomicInteger; | |
94 | 92 | |
95 | 93 | @Slf4j |
96 | 94 | @Component | ... | ... |
... | ... | @@ -92,6 +92,8 @@ public class DeviceProfileController extends BaseController { |
92 | 92 | |
93 | 93 | DeviceProfile savedDeviceProfile = checkNotNull(deviceProfileService.saveDeviceProfile(deviceProfile)); |
94 | 94 | |
95 | + tbClusterService.onDeviceProfileChange(savedDeviceProfile, null); | |
96 | + | |
95 | 97 | logEntityAction(savedDeviceProfile.getId(), savedDeviceProfile, |
96 | 98 | null, |
97 | 99 | savedDeviceProfile.getId() == null ? ActionType.ADDED : ActionType.UPDATED, null); | ... | ... |
... | ... | @@ -21,6 +21,7 @@ import org.springframework.beans.factory.annotation.Value; |
21 | 21 | import org.springframework.scheduling.annotation.Scheduled; |
22 | 22 | import org.springframework.stereotype.Service; |
23 | 23 | import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg; |
24 | +import org.thingsboard.server.common.data.DeviceProfile; | |
24 | 25 | import org.thingsboard.server.common.data.EntityType; |
25 | 26 | import org.thingsboard.server.common.data.id.EntityId; |
26 | 27 | import org.thingsboard.server.common.data.id.TenantId; |
... | ... | @@ -29,6 +30,7 @@ import org.thingsboard.server.common.msg.TbMsg; |
29 | 30 | import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; |
30 | 31 | import org.thingsboard.server.common.msg.queue.ServiceType; |
31 | 32 | import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
33 | +import org.thingsboard.server.gen.transport.TransportProtos; | |
32 | 34 | import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; |
33 | 35 | import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; |
34 | 36 | import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; |
... | ... | @@ -40,7 +42,7 @@ import org.thingsboard.server.queue.TbQueueProducer; |
40 | 42 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
41 | 43 | import org.thingsboard.server.queue.discovery.PartitionService; |
42 | 44 | import org.thingsboard.server.queue.provider.TbQueueProducerProvider; |
43 | -import org.thingsboard.server.service.encoding.DataDecodingEncodingService; | |
45 | +import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; | |
44 | 46 | import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; |
45 | 47 | |
46 | 48 | import java.util.HashSet; |
... | ... | @@ -163,6 +165,20 @@ public class DefaultTbClusterService implements TbClusterService { |
163 | 165 | broadcast(new ComponentLifecycleMsg(tenantId, entityId, state)); |
164 | 166 | } |
165 | 167 | |
168 | + @Override | |
169 | + public void onDeviceProfileChange(DeviceProfile deviceProfile, TbQueueCallback callback) { | |
170 | + log.trace("[{}][{}] Processing device profile [{}] event", deviceProfile.getTenantId(), deviceProfile.getId(), deviceProfile.getName()); | |
171 | + TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer(); | |
172 | + Set<String> tbTransportServices = partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT); | |
173 | + TransportProtos.DeviceProfileUpdateMsg profileUpdateMsg = TransportProtos.DeviceProfileUpdateMsg.newBuilder().setData(ByteString.copyFrom(encodingService.encode(deviceProfile))).build(); | |
174 | + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setDeviceProfileUpdateMsg(profileUpdateMsg).build(); | |
175 | + for (String transportServiceId : tbTransportServices) { | |
176 | + TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportServiceId); | |
177 | + toTransportNfProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), null); | |
178 | + toTransportNfs.incrementAndGet(); | |
179 | + } | |
180 | + } | |
181 | + | |
166 | 182 | private void broadcast(ComponentLifecycleMsg msg) { |
167 | 183 | byte[] msgBytes = encodingService.encode(msg); |
168 | 184 | TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer(); | ... | ... |
... | ... | @@ -47,7 +47,7 @@ import org.thingsboard.server.queue.discovery.PartitionChangeEvent; |
47 | 47 | import org.thingsboard.server.queue.provider.TbCoreQueueFactory; |
48 | 48 | import org.thingsboard.server.common.stats.StatsFactory; |
49 | 49 | import org.thingsboard.server.queue.util.TbCoreComponent; |
50 | -import org.thingsboard.server.service.encoding.DataDecodingEncodingService; | |
50 | +import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; | |
51 | 51 | import org.thingsboard.server.service.queue.processing.AbstractConsumerService; |
52 | 52 | import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; |
53 | 53 | import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; | ... | ... |
... | ... | @@ -37,7 +37,7 @@ import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; |
37 | 37 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
38 | 38 | import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; |
39 | 39 | import org.thingsboard.server.queue.util.TbRuleEngineComponent; |
40 | -import org.thingsboard.server.service.encoding.DataDecodingEncodingService; | |
40 | +import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; | |
41 | 41 | import org.thingsboard.server.service.queue.processing.*; |
42 | 42 | import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; |
43 | 43 | import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService; | ... | ... |
... | ... | @@ -16,6 +16,7 @@ |
16 | 16 | package org.thingsboard.server.service.queue; |
17 | 17 | |
18 | 18 | import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg; |
19 | +import org.thingsboard.server.common.data.DeviceProfile; | |
19 | 20 | import org.thingsboard.server.common.data.id.EntityId; |
20 | 21 | import org.thingsboard.server.common.data.id.TenantId; |
21 | 22 | import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; |
... | ... | @@ -49,4 +50,6 @@ public interface TbClusterService { |
49 | 50 | |
50 | 51 | void onEntityStateChange(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state); |
51 | 52 | |
53 | + void onDeviceProfileChange(DeviceProfile deviceProfile, TbQueueCallback callback); | |
54 | + | |
52 | 55 | } | ... | ... |
... | ... | @@ -26,7 +26,7 @@ import org.thingsboard.server.common.msg.queue.TbCallback; |
26 | 26 | import org.thingsboard.server.queue.TbQueueConsumer; |
27 | 27 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
28 | 28 | import org.thingsboard.server.queue.discovery.PartitionChangeEvent; |
29 | -import org.thingsboard.server.service.encoding.DataDecodingEncodingService; | |
29 | +import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; | |
30 | 30 | import org.thingsboard.server.service.queue.TbPackCallback; |
31 | 31 | import org.thingsboard.server.service.queue.TbPackProcessingContext; |
32 | 32 | ... | ... |
... | ... | @@ -21,16 +21,17 @@ import com.fasterxml.jackson.databind.node.ObjectNode; |
21 | 21 | import com.google.common.util.concurrent.Futures; |
22 | 22 | import com.google.common.util.concurrent.ListenableFuture; |
23 | 23 | import com.google.common.util.concurrent.MoreExecutors; |
24 | +import com.google.protobuf.ByteString; | |
24 | 25 | import lombok.extern.slf4j.Slf4j; |
25 | -import org.springframework.beans.factory.annotation.Autowired; | |
26 | 26 | import org.springframework.stereotype.Service; |
27 | 27 | import org.springframework.util.StringUtils; |
28 | 28 | import org.thingsboard.server.common.data.DataConstants; |
29 | 29 | import org.thingsboard.server.common.data.Device; |
30 | -import org.thingsboard.server.common.data.Tenant; | |
30 | +import org.thingsboard.server.common.data.DeviceProfile; | |
31 | 31 | import org.thingsboard.server.common.data.TenantProfile; |
32 | 32 | import org.thingsboard.server.common.data.id.CustomerId; |
33 | 33 | import org.thingsboard.server.common.data.id.DeviceId; |
34 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
34 | 35 | import org.thingsboard.server.common.data.id.TenantId; |
35 | 36 | import org.thingsboard.server.common.data.relation.EntityRelation; |
36 | 37 | import org.thingsboard.server.common.data.security.DeviceCredentials; |
... | ... | @@ -38,11 +39,14 @@ import org.thingsboard.server.common.data.security.DeviceCredentialsType; |
38 | 39 | import org.thingsboard.server.common.msg.TbMsg; |
39 | 40 | import org.thingsboard.server.common.msg.TbMsgDataType; |
40 | 41 | import org.thingsboard.server.common.msg.TbMsgMetaData; |
42 | +import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; | |
41 | 43 | import org.thingsboard.server.dao.device.DeviceCredentialsService; |
44 | +import org.thingsboard.server.dao.device.DeviceProfileService; | |
42 | 45 | import org.thingsboard.server.dao.device.DeviceService; |
43 | 46 | import org.thingsboard.server.dao.relation.RelationService; |
44 | 47 | import org.thingsboard.server.dao.tenant.TenantProfileService; |
45 | 48 | import org.thingsboard.server.dao.tenant.TenantService; |
49 | +import org.thingsboard.server.gen.transport.TransportProtos; | |
46 | 50 | import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; |
47 | 51 | import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; |
48 | 52 | import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg; |
... | ... | @@ -76,47 +80,60 @@ public class DefaultTransportApiService implements TransportApiService { |
76 | 80 | private static final ObjectMapper mapper = new ObjectMapper(); |
77 | 81 | |
78 | 82 | //TODO: Constructor dependencies; |
79 | - @Autowired | |
80 | - private TenantService tenantService; | |
81 | - | |
82 | - @Autowired | |
83 | - private TenantProfileService tenantProfileService; | |
84 | - | |
85 | - @Autowired | |
86 | - private DeviceService deviceService; | |
87 | - | |
88 | - @Autowired | |
89 | - private RelationService relationService; | |
90 | - | |
91 | - @Autowired | |
92 | - private DeviceCredentialsService deviceCredentialsService; | |
93 | - | |
94 | - @Autowired | |
95 | - private DeviceStateService deviceStateService; | |
96 | - | |
97 | - @Autowired | |
98 | - private DbCallbackExecutorService dbCallbackExecutorService; | |
99 | - | |
100 | - @Autowired | |
101 | - protected TbClusterService tbClusterService; | |
83 | + private final DeviceProfileService deviceProfileService; | |
84 | + private final TenantService tenantService; | |
85 | + private final TenantProfileService tenantProfileService; | |
86 | + private final DeviceService deviceService; | |
87 | + private final RelationService relationService; | |
88 | + private final DeviceCredentialsService deviceCredentialsService; | |
89 | + private final DeviceStateService deviceStateService; | |
90 | + private final DbCallbackExecutorService dbCallbackExecutorService; | |
91 | + private final TbClusterService tbClusterService; | |
92 | + private final DataDecodingEncodingService dataDecodingEncodingService; | |
102 | 93 | |
103 | 94 | private final ConcurrentMap<String, ReentrantLock> deviceCreationLocks = new ConcurrentHashMap<>(); |
104 | 95 | |
96 | + public DefaultTransportApiService(DeviceProfileService deviceProfileService, TenantService tenantService, | |
97 | + TenantProfileService tenantProfileService, DeviceService deviceService, | |
98 | + RelationService relationService, DeviceCredentialsService deviceCredentialsService, | |
99 | + DeviceStateService deviceStateService, DbCallbackExecutorService dbCallbackExecutorService, | |
100 | + TbClusterService tbClusterService, DataDecodingEncodingService dataDecodingEncodingService) { | |
101 | + this.deviceProfileService = deviceProfileService; | |
102 | + this.tenantService = tenantService; | |
103 | + this.tenantProfileService = tenantProfileService; | |
104 | + this.deviceService = deviceService; | |
105 | + this.relationService = relationService; | |
106 | + this.deviceCredentialsService = deviceCredentialsService; | |
107 | + this.deviceStateService = deviceStateService; | |
108 | + this.dbCallbackExecutorService = dbCallbackExecutorService; | |
109 | + this.tbClusterService = tbClusterService; | |
110 | + this.dataDecodingEncodingService = dataDecodingEncodingService; | |
111 | + } | |
112 | + | |
105 | 113 | @Override |
106 | 114 | public ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> handle(TbProtoQueueMsg<TransportApiRequestMsg> tbProtoQueueMsg) { |
107 | 115 | TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue(); |
108 | 116 | if (transportApiRequestMsg.hasValidateTokenRequestMsg()) { |
109 | 117 | ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg(); |
110 | - return Futures.transform(validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); | |
118 | + return Futures.transform(validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN), | |
119 | + value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); | |
111 | 120 | } else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) { |
112 | 121 | ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg(); |
113 | - return Futures.transform(validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); | |
122 | + return Futures.transform(validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE), | |
123 | + value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); | |
114 | 124 | } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) { |
115 | - return Futures.transform(handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); | |
125 | + return Futures.transform(handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()), | |
126 | + value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); | |
116 | 127 | } else if (transportApiRequestMsg.hasGetTenantRoutingInfoRequestMsg()) { |
117 | - return Futures.transform(handle(transportApiRequestMsg.getGetTenantRoutingInfoRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); | |
128 | + return Futures.transform(handle(transportApiRequestMsg.getGetTenantRoutingInfoRequestMsg()), | |
129 | + value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); | |
130 | + } else if (transportApiRequestMsg.hasGetDeviceProfileRequestMsg()) { | |
131 | + return Futures.transform(handle(transportApiRequestMsg.getGetDeviceProfileRequestMsg()), | |
132 | + value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); | |
133 | + | |
118 | 134 | } |
119 | - return Futures.transform(getEmptyTransportApiResponseFuture(), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); | |
135 | + return Futures.transform(getEmptyTransportApiResponseFuture(), | |
136 | + value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); | |
120 | 137 | } |
121 | 138 | |
122 | 139 | private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) { |
... | ... | @@ -160,10 +177,19 @@ public class DefaultTransportApiService implements TransportApiService { |
160 | 177 | TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, metaData, TbMsgDataType.JSON, mapper.writeValueAsString(entityNode)); |
161 | 178 | tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, null); |
162 | 179 | } |
180 | + GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder() | |
181 | + .setDeviceInfo(getDeviceInfoProto(device)); | |
182 | + DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(device.getTenantId(), device.getDeviceProfileId()); | |
183 | + if (deviceProfile != null) { | |
184 | + builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))); | |
185 | + } else { | |
186 | + log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId()); | |
187 | + } | |
163 | 188 | return TransportApiResponseMsg.newBuilder() |
164 | - .setGetOrCreateDeviceResponseMsg(GetOrCreateDeviceFromGatewayResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build(); | |
189 | + .setGetOrCreateDeviceResponseMsg(builder.build()) | |
190 | + .build(); | |
165 | 191 | } catch (JsonProcessingException e) { |
166 | - log.warn("[{}] Failed to lookup device by gateway id and name", gatewayId, requestMsg.getDeviceName(), e); | |
192 | + log.warn("[{}] Failed to lookup device by gateway id and name: [{}]", gatewayId, requestMsg.getDeviceName(), e); | |
167 | 193 | throw new RuntimeException(e); |
168 | 194 | } finally { |
169 | 195 | deviceCreationLock.unlock(); |
... | ... | @@ -182,6 +208,16 @@ public class DefaultTransportApiService implements TransportApiService { |
182 | 208 | .setIsolatedTbRuleEngine(tenantProfile.isIsolatedTbRuleEngine()).build()).build(), dbCallbackExecutorService); |
183 | 209 | } |
184 | 210 | |
211 | + private ListenableFuture<TransportApiResponseMsg> handle(TransportProtos.GetDeviceProfileRequestMsg requestMsg) { | |
212 | + DeviceProfileId profileId = new DeviceProfileId(new UUID(requestMsg.getProfileIdMSB(), requestMsg.getProfileIdLSB())); | |
213 | + DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(TenantId.SYS_TENANT_ID, profileId); | |
214 | + return Futures.immediateFuture(TransportApiResponseMsg.newBuilder() | |
215 | + .setGetDeviceProfileResponseMsg( | |
216 | + TransportProtos.GetDeviceProfileResponseMsg.newBuilder() | |
217 | + .setData(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))) | |
218 | + .build()).build()); | |
219 | + } | |
220 | + | |
185 | 221 | private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceId deviceId, DeviceCredentials credentials) { |
186 | 222 | return Futures.transform(deviceService.findDeviceByIdAsync(TenantId.SYS_TENANT_ID, deviceId), device -> { |
187 | 223 | if (device == null) { |
... | ... | @@ -191,6 +227,12 @@ public class DefaultTransportApiService implements TransportApiService { |
191 | 227 | try { |
192 | 228 | ValidateDeviceCredentialsResponseMsg.Builder builder = ValidateDeviceCredentialsResponseMsg.newBuilder(); |
193 | 229 | builder.setDeviceInfo(getDeviceInfoProto(device)); |
230 | + DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(device.getTenantId(), device.getDeviceProfileId()); | |
231 | + if (deviceProfile != null) { | |
232 | + builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))); | |
233 | + } else { | |
234 | + log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId()); | |
235 | + } | |
194 | 236 | if (!StringUtils.isEmpty(credentials.getCredentialsValue())) { |
195 | 237 | builder.setCredentialsBody(credentials.getCredentialsValue()); |
196 | 238 | } |
... | ... | @@ -211,6 +253,8 @@ public class DefaultTransportApiService implements TransportApiService { |
211 | 253 | .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits()) |
212 | 254 | .setDeviceName(device.getName()) |
213 | 255 | .setDeviceType(device.getType()) |
256 | + .setDeviceProfileIdMSB(device.getDeviceProfileId().getId().getMostSignificantBits()) | |
257 | + .setDeviceProfileIdLSB(device.getDeviceProfileId().getId().getLeastSignificantBits()) | |
214 | 258 | .setAdditionalInfo(mapper.writeValueAsString(device.getAdditionalInfo())) |
215 | 259 | .build(); |
216 | 260 | } | ... | ... |
... | ... | @@ -19,6 +19,7 @@ import com.fasterxml.jackson.core.type.TypeReference; |
19 | 19 | import org.junit.After; |
20 | 20 | import org.junit.Assert; |
21 | 21 | import org.junit.Before; |
22 | +import org.junit.Ignore; | |
22 | 23 | import org.junit.Test; |
23 | 24 | import org.thingsboard.server.common.data.Device; |
24 | 25 | import org.thingsboard.server.common.data.DeviceProfile; |
... | ... | @@ -151,11 +152,13 @@ public abstract class BaseDeviceProfileControllerTest extends AbstractController |
151 | 152 | .andExpect(statusReason(containsString("Device profile with such name already exists"))); |
152 | 153 | } |
153 | 154 | |
155 | + @Ignore | |
154 | 156 | @Test |
155 | 157 | public void testSaveSameDeviceProfileWithDifferentType() throws Exception { |
156 | 158 | DeviceProfile deviceProfile = this.createDeviceProfile("Device Profile"); |
157 | 159 | DeviceProfile savedDeviceProfile = doPost("/api/deviceProfile", deviceProfile, DeviceProfile.class); |
158 | - savedDeviceProfile.setType(DeviceProfileType.LWM2M); | |
160 | + //TODO uncomment once we have other device types; | |
161 | + //savedDeviceProfile.setType(DeviceProfileType.LWM2M); | |
159 | 162 | doPost("/api/deviceProfile", savedDeviceProfile).andExpect(status().isBadRequest()) |
160 | 163 | .andExpect(statusReason(containsString("Changing type of device profile is prohibited"))); |
161 | 164 | } |
... | ... | @@ -265,7 +268,7 @@ public abstract class BaseDeviceProfileControllerTest extends AbstractController |
265 | 268 | Collections.sort(loadedDeviceProfileInfos, deviceProfileInfoIdComparator); |
266 | 269 | |
267 | 270 | List<DeviceProfileInfo> deviceProfileInfos = deviceProfiles.stream().map(deviceProfile -> new DeviceProfileInfo(deviceProfile.getId(), |
268 | - deviceProfile.getName(), deviceProfile.getType())).collect(Collectors.toList()); | |
271 | + deviceProfile.getName(), deviceProfile.getType(), deviceProfile.getTransportType())).collect(Collectors.toList()); | |
269 | 272 | |
270 | 273 | Assert.assertEquals(deviceProfileInfos, loadedDeviceProfileInfos); |
271 | 274 | ... | ... |
... | ... | @@ -40,6 +40,7 @@ public class DeviceProfile extends SearchTextBased<DeviceProfileId> implements H |
40 | 40 | private String description; |
41 | 41 | private boolean isDefault; |
42 | 42 | private DeviceProfileType type; |
43 | + private DeviceTransportType transportType; | |
43 | 44 | private RuleChainId defaultRuleChainId; |
44 | 45 | private transient DeviceProfileData profileData; |
45 | 46 | @JsonIgnore | ... | ... |
... | ... | @@ -31,18 +31,22 @@ import java.util.UUID; |
31 | 31 | public class DeviceProfileInfo extends EntityInfo { |
32 | 32 | |
33 | 33 | private final DeviceProfileType type; |
34 | + private final DeviceTransportType transportType; | |
34 | 35 | |
35 | 36 | @JsonCreator |
36 | 37 | public DeviceProfileInfo(@JsonProperty("id") EntityId id, |
37 | 38 | @JsonProperty("name") String name, |
38 | - @JsonProperty("type") DeviceProfileType type) { | |
39 | + @JsonProperty("type") DeviceProfileType type, | |
40 | + @JsonProperty("transportType") DeviceTransportType transportType) { | |
39 | 41 | super(id, name); |
40 | 42 | this.type = type; |
43 | + this.transportType = transportType; | |
41 | 44 | } |
42 | 45 | |
43 | - public DeviceProfileInfo(UUID uuid, String name, DeviceProfileType type) { | |
46 | + public DeviceProfileInfo(UUID uuid, String name, DeviceProfileType type, DeviceTransportType transportType) { | |
44 | 47 | super(EntityIdFactory.getByTypeAndUuid(EntityType.DEVICE_PROFILE, uuid), name); |
45 | 48 | this.type = type; |
49 | + this.transportType = transportType; | |
46 | 50 | } |
47 | 51 | |
48 | 52 | } | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2020 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.data; | |
17 | + | |
18 | +public enum DeviceTransportType { | |
19 | + DEFAULT, | |
20 | + MQTT, | |
21 | + LWM2M | |
22 | +} | ... | ... |
common/data/src/main/java/org/thingsboard/server/common/data/device/data/DefaultDeviceTransportConfiguration.java
renamed from
common/data/src/main/java/org/thingsboard/server/common/data/device/data/Lwm2mDeviceConfiguration.java
... | ... | @@ -17,13 +17,14 @@ package org.thingsboard.server.common.data.device.data; |
17 | 17 | |
18 | 18 | import lombok.Data; |
19 | 19 | import org.thingsboard.server.common.data.DeviceProfileType; |
20 | +import org.thingsboard.server.common.data.DeviceTransportType; | |
20 | 21 | |
21 | 22 | @Data |
22 | -public class Lwm2mDeviceConfiguration implements DeviceConfiguration { | |
23 | +public class DefaultDeviceTransportConfiguration implements DeviceTransportConfiguration { | |
23 | 24 | |
24 | 25 | @Override |
25 | - public DeviceProfileType getType() { | |
26 | - return DeviceProfileType.LWM2M; | |
26 | + public DeviceTransportType getType() { | |
27 | + return DeviceTransportType.DEFAULT; | |
27 | 28 | } |
28 | 29 | |
29 | 30 | } | ... | ... |
... | ... | @@ -27,8 +27,7 @@ import org.thingsboard.server.common.data.DeviceProfileType; |
27 | 27 | include = JsonTypeInfo.As.PROPERTY, |
28 | 28 | property = "type") |
29 | 29 | @JsonSubTypes({ |
30 | - @JsonSubTypes.Type(value = DefaultDeviceConfiguration.class, name = "DEFAULT"), | |
31 | - @JsonSubTypes.Type(value = Lwm2mDeviceConfiguration.class, name = "LWM2M")}) | |
30 | + @JsonSubTypes.Type(value = DefaultDeviceConfiguration.class, name = "DEFAULT")}) | |
32 | 31 | public interface DeviceConfiguration { |
33 | 32 | |
34 | 33 | @JsonIgnore | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2020 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.data.device.data; | |
17 | + | |
18 | +import com.fasterxml.jackson.annotation.JsonIgnore; | |
19 | +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | |
20 | +import com.fasterxml.jackson.annotation.JsonSubTypes; | |
21 | +import com.fasterxml.jackson.annotation.JsonTypeInfo; | |
22 | +import org.thingsboard.server.common.data.DeviceTransportType; | |
23 | + | |
24 | +@JsonIgnoreProperties(ignoreUnknown = true) | |
25 | +@JsonTypeInfo( | |
26 | + use = JsonTypeInfo.Id.NAME, | |
27 | + include = JsonTypeInfo.As.PROPERTY, | |
28 | + property = "type") | |
29 | +@JsonSubTypes({ | |
30 | + @JsonSubTypes.Type(value = DefaultDeviceTransportConfiguration.class, name = "DEFAULT"), | |
31 | + @JsonSubTypes.Type(value = MqttDeviceTransportConfiguration.class, name = "MQTT"), | |
32 | + @JsonSubTypes.Type(value = Lwm2mDeviceTransportConfiguration.class, name = "LWM2M")}) | |
33 | +public interface DeviceTransportConfiguration { | |
34 | + | |
35 | + @JsonIgnore | |
36 | + DeviceTransportType getType(); | |
37 | + | |
38 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2020 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.data.device.data; | |
17 | + | |
18 | +import lombok.Data; | |
19 | +import org.thingsboard.server.common.data.DeviceProfileType; | |
20 | +import org.thingsboard.server.common.data.DeviceTransportType; | |
21 | + | |
22 | +@Data | |
23 | +public class Lwm2mDeviceTransportConfiguration implements DeviceTransportConfiguration { | |
24 | + | |
25 | + @Override | |
26 | + public DeviceTransportType getType() { | |
27 | + return DeviceTransportType.LWM2M; | |
28 | + } | |
29 | + | |
30 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2020 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.data.device.data; | |
17 | + | |
18 | +import lombok.Data; | |
19 | +import org.thingsboard.server.common.data.DeviceTransportType; | |
20 | + | |
21 | +@Data | |
22 | +public class MqttDeviceTransportConfiguration implements DeviceTransportConfiguration { | |
23 | + | |
24 | + @Override | |
25 | + public DeviceTransportType getType() { | |
26 | + return DeviceTransportType.MQTT; | |
27 | + } | |
28 | + | |
29 | +} | ... | ... |
common/data/src/main/java/org/thingsboard/server/common/data/device/profile/DefaultDeviceProfileTransportConfiguration.java
renamed from
common/data/src/main/java/org/thingsboard/server/common/data/device/profile/Lwm2mDeviceProfileConfiguration.java
... | ... | @@ -17,13 +17,14 @@ package org.thingsboard.server.common.data.device.profile; |
17 | 17 | |
18 | 18 | import lombok.Data; |
19 | 19 | import org.thingsboard.server.common.data.DeviceProfileType; |
20 | +import org.thingsboard.server.common.data.DeviceTransportType; | |
20 | 21 | |
21 | 22 | @Data |
22 | -public class Lwm2mDeviceProfileConfiguration implements DeviceProfileConfiguration { | |
23 | +public class DefaultDeviceProfileTransportConfiguration implements DeviceProfileTransportConfiguration { | |
23 | 24 | |
24 | 25 | @Override |
25 | - public DeviceProfileType getType() { | |
26 | - return DeviceProfileType.LWM2M; | |
26 | + public DeviceTransportType getType() { | |
27 | + return DeviceTransportType.DEFAULT; | |
27 | 28 | } |
28 | 29 | |
29 | 30 | } | ... | ... |
... | ... | @@ -27,8 +27,7 @@ import org.thingsboard.server.common.data.DeviceProfileType; |
27 | 27 | include = JsonTypeInfo.As.PROPERTY, |
28 | 28 | property = "type") |
29 | 29 | @JsonSubTypes({ |
30 | - @JsonSubTypes.Type(value = DefaultDeviceProfileConfiguration.class, name = "DEFAULT"), | |
31 | - @JsonSubTypes.Type(value = Lwm2mDeviceProfileConfiguration.class, name = "LWM2M")}) | |
30 | + @JsonSubTypes.Type(value = DefaultDeviceProfileConfiguration.class, name = "DEFAULT")}) | |
32 | 31 | public interface DeviceProfileConfiguration { |
33 | 32 | |
34 | 33 | @JsonIgnore | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2020 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.data.device.profile; | |
17 | + | |
18 | +import com.fasterxml.jackson.annotation.JsonIgnore; | |
19 | +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | |
20 | +import com.fasterxml.jackson.annotation.JsonSubTypes; | |
21 | +import com.fasterxml.jackson.annotation.JsonTypeInfo; | |
22 | +import org.thingsboard.server.common.data.DeviceProfileType; | |
23 | +import org.thingsboard.server.common.data.DeviceTransportType; | |
24 | + | |
25 | +@JsonIgnoreProperties(ignoreUnknown = true) | |
26 | +@JsonTypeInfo( | |
27 | + use = JsonTypeInfo.Id.NAME, | |
28 | + include = JsonTypeInfo.As.PROPERTY, | |
29 | + property = "type") | |
30 | +@JsonSubTypes({ | |
31 | + @JsonSubTypes.Type(value = DefaultDeviceProfileTransportConfiguration.class, name = "DEFAULT"), | |
32 | + @JsonSubTypes.Type(value = MqttDeviceProfileTransportConfiguration.class, name = "MQTT"), | |
33 | + @JsonSubTypes.Type(value = Lwm2mDeviceProfileTransportConfiguration.class, name = "LWM2M")}) | |
34 | +public interface DeviceProfileTransportConfiguration { | |
35 | + | |
36 | + @JsonIgnore | |
37 | + DeviceTransportType getType(); | |
38 | + | |
39 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2020 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.data.device.profile; | |
17 | + | |
18 | +import lombok.Data; | |
19 | +import org.thingsboard.server.common.data.DeviceProfileType; | |
20 | +import org.thingsboard.server.common.data.DeviceTransportType; | |
21 | + | |
22 | +@Data | |
23 | +public class Lwm2mDeviceProfileTransportConfiguration implements DeviceProfileTransportConfiguration { | |
24 | + | |
25 | + @Override | |
26 | + public DeviceTransportType getType() { | |
27 | + return DeviceTransportType.LWM2M; | |
28 | + } | |
29 | + | |
30 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2020 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.data.device.profile; | |
17 | + | |
18 | +import lombok.Data; | |
19 | +import org.thingsboard.server.common.data.DeviceTransportType; | |
20 | + | |
21 | +@Data | |
22 | +public class MqttDeviceProfileTransportConfiguration implements DeviceProfileTransportConfiguration { | |
23 | + | |
24 | + @Override | |
25 | + public DeviceTransportType getType() { | |
26 | + return DeviceTransportType.MQTT; | |
27 | + } | |
28 | + | |
29 | +} | ... | ... |
... | ... | @@ -51,6 +51,8 @@ message SessionInfoProto { |
51 | 51 | string deviceType = 9; |
52 | 52 | int64 gwSessionIdMSB = 10; |
53 | 53 | int64 gwSessionIdLSB = 11; |
54 | + int64 deviceProfileIdMSB = 12; | |
55 | + int64 deviceProfileIdLSB = 13; | |
54 | 56 | } |
55 | 57 | |
56 | 58 | enum SessionEvent { |
... | ... | @@ -99,6 +101,8 @@ message DeviceInfoProto { |
99 | 101 | string deviceName = 5; |
100 | 102 | string deviceType = 6; |
101 | 103 | string additionalInfo = 7; |
104 | + int64 deviceProfileIdMSB = 8; | |
105 | + int64 deviceProfileIdLSB = 9; | |
102 | 106 | } |
103 | 107 | |
104 | 108 | /** |
... | ... | @@ -147,6 +151,7 @@ message ValidateDeviceX509CertRequestMsg { |
147 | 151 | message ValidateDeviceCredentialsResponseMsg { |
148 | 152 | DeviceInfoProto deviceInfo = 1; |
149 | 153 | string credentialsBody = 2; |
154 | + bytes profileBody = 3; | |
150 | 155 | } |
151 | 156 | |
152 | 157 | message GetOrCreateDeviceFromGatewayRequestMsg { |
... | ... | @@ -158,6 +163,7 @@ message GetOrCreateDeviceFromGatewayRequestMsg { |
158 | 163 | |
159 | 164 | message GetOrCreateDeviceFromGatewayResponseMsg { |
160 | 165 | DeviceInfoProto deviceInfo = 1; |
166 | + bytes profileBody = 2; | |
161 | 167 | } |
162 | 168 | |
163 | 169 | message GetTenantRoutingInfoRequestMsg { |
... | ... | @@ -170,6 +176,19 @@ message GetTenantRoutingInfoResponseMsg { |
170 | 176 | bool isolatedTbRuleEngine = 2; |
171 | 177 | } |
172 | 178 | |
179 | +message GetDeviceProfileRequestMsg { | |
180 | + int64 profileIdMSB = 1; | |
181 | + int64 profileIdLSB = 2; | |
182 | +} | |
183 | + | |
184 | +message GetDeviceProfileResponseMsg { | |
185 | + bytes data = 1; | |
186 | +} | |
187 | + | |
188 | +message DeviceProfileUpdateMsg { | |
189 | + bytes data = 1; | |
190 | +} | |
191 | + | |
173 | 192 | message SessionCloseNotificationProto { |
174 | 193 | string message = 1; |
175 | 194 | } |
... | ... | @@ -399,6 +418,7 @@ message FromDeviceRPCResponseProto { |
399 | 418 | string response = 3; |
400 | 419 | int32 error = 4; |
401 | 420 | } |
421 | + | |
402 | 422 | /** |
403 | 423 | * Main messages; |
404 | 424 | */ |
... | ... | @@ -409,6 +429,7 @@ message TransportApiRequestMsg { |
409 | 429 | ValidateDeviceX509CertRequestMsg validateX509CertRequestMsg = 2; |
410 | 430 | GetOrCreateDeviceFromGatewayRequestMsg getOrCreateDeviceRequestMsg = 3; |
411 | 431 | GetTenantRoutingInfoRequestMsg getTenantRoutingInfoRequestMsg = 4; |
432 | + GetDeviceProfileRequestMsg getDeviceProfileRequestMsg = 5; | |
412 | 433 | } |
413 | 434 | |
414 | 435 | /* Response from ThingsBoard Core Service to Transport Service */ |
... | ... | @@ -416,6 +437,7 @@ message TransportApiResponseMsg { |
416 | 437 | ValidateDeviceCredentialsResponseMsg validateTokenResponseMsg = 1; |
417 | 438 | GetOrCreateDeviceFromGatewayResponseMsg getOrCreateDeviceResponseMsg = 2; |
418 | 439 | GetTenantRoutingInfoResponseMsg getTenantRoutingInfoResponseMsg = 4; |
440 | + GetDeviceProfileResponseMsg getDeviceProfileResponseMsg = 5; | |
419 | 441 | } |
420 | 442 | |
421 | 443 | /* Messages that are handled by ThingsBoard Core Service */ |
... | ... | @@ -456,4 +478,5 @@ message ToTransportMsg { |
456 | 478 | AttributeUpdateNotificationMsg attributeUpdateNotification = 5; |
457 | 479 | ToDeviceRpcRequestMsg toDeviceRequest = 6; |
458 | 480 | ToServerRpcResponseMsg toServerResponse = 7; |
481 | + DeviceProfileUpdateMsg deviceProfileUpdateMsg = 8; | |
459 | 482 | } | ... | ... |
... | ... | @@ -520,6 +520,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
520 | 520 | .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB()) |
521 | 521 | .setDeviceName(msg.getDeviceInfo().getDeviceName()) |
522 | 522 | .setDeviceType(msg.getDeviceInfo().getDeviceType()) |
523 | + .setDeviceProfileIdMSB(msg.getDeviceInfo().getDeviceProfileIdMSB()) | |
524 | + .setDeviceProfileIdLSB(msg.getDeviceInfo().getDeviceProfileIdLSB()) | |
523 | 525 | .build(); |
524 | 526 | transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() { |
525 | 527 | @Override | ... | ... |
... | ... | @@ -61,6 +61,10 @@ |
61 | 61 | <artifactId>gson</artifactId> |
62 | 62 | </dependency> |
63 | 63 | <dependency> |
64 | + <groupId>de.ruedigermoeller</groupId> | |
65 | + <artifactId>fst</artifactId> | |
66 | + </dependency> | |
67 | + <dependency> | |
64 | 68 | <groupId>org.slf4j</groupId> |
65 | 69 | <artifactId>slf4j-api</artifactId> |
66 | 70 | </dependency> | ... | ... |
... | ... | @@ -15,6 +15,7 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.common.transport; |
17 | 17 | |
18 | +import org.thingsboard.server.common.data.DeviceProfile; | |
18 | 19 | import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg; |
19 | 20 | import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; |
20 | 21 | import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; |
... | ... | @@ -35,4 +36,8 @@ public interface SessionMsgListener { |
35 | 36 | void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest); |
36 | 37 | |
37 | 38 | void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse); |
39 | + | |
40 | + default void onProfileUpdate(DeviceProfile deviceProfile) { | |
41 | + } | |
42 | + | |
38 | 43 | } | ... | ... |
... | ... | @@ -15,6 +15,8 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.common.transport; |
17 | 17 | |
18 | +import org.thingsboard.server.common.data.DeviceProfile; | |
19 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
18 | 20 | import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; |
19 | 21 | import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; |
20 | 22 | import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; |
... | ... | @@ -50,6 +52,10 @@ public interface TransportService { |
50 | 52 | void process(GetOrCreateDeviceFromGatewayRequestMsg msg, |
51 | 53 | TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg> callback); |
52 | 54 | |
55 | + void getDeviceProfile(DeviceProfileId deviceProfileId, TransportServiceCallback<DeviceProfile> callback); | |
56 | + | |
57 | + void onProfileUpdate(DeviceProfile deviceProfile); | |
58 | + | |
53 | 59 | boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback); |
54 | 60 | |
55 | 61 | void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback); | ... | ... |
... | ... | @@ -15,19 +15,26 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.common.transport.service; |
17 | 17 | |
18 | +import com.google.common.util.concurrent.Futures; | |
19 | +import com.google.common.util.concurrent.ListenableFuture; | |
18 | 20 | import com.google.gson.Gson; |
19 | 21 | import com.google.gson.JsonObject; |
22 | +import com.google.protobuf.ByteString; | |
20 | 23 | import lombok.extern.slf4j.Slf4j; |
21 | 24 | import org.springframework.beans.factory.annotation.Value; |
22 | 25 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
23 | 26 | import org.springframework.stereotype.Service; |
24 | 27 | import org.thingsboard.common.util.ThingsBoardThreadFactory; |
28 | +import org.thingsboard.server.common.data.DeviceProfile; | |
25 | 29 | import org.thingsboard.server.common.data.EntityType; |
26 | 30 | import org.thingsboard.server.common.data.id.DeviceId; |
31 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
32 | +import org.thingsboard.server.common.data.id.RuleChainId; | |
27 | 33 | import org.thingsboard.server.common.data.id.TenantId; |
28 | 34 | import org.thingsboard.server.common.msg.TbMsg; |
29 | 35 | import org.thingsboard.server.common.msg.TbMsgDataType; |
30 | 36 | import org.thingsboard.server.common.msg.TbMsgMetaData; |
37 | +import org.thingsboard.server.common.msg.queue.ServiceQueue; | |
31 | 38 | import org.thingsboard.server.common.msg.queue.ServiceType; |
32 | 39 | import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
33 | 40 | import org.thingsboard.server.common.msg.session.SessionMsgType; |
... | ... | @@ -36,6 +43,7 @@ import org.thingsboard.server.common.msg.tools.TbRateLimitsException; |
36 | 43 | import org.thingsboard.server.common.transport.SessionMsgListener; |
37 | 44 | import org.thingsboard.server.common.transport.TransportService; |
38 | 45 | import org.thingsboard.server.common.transport.TransportServiceCallback; |
46 | +import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; | |
39 | 47 | import org.thingsboard.server.common.transport.util.JsonUtils; |
40 | 48 | import org.thingsboard.server.gen.transport.TransportProtos; |
41 | 49 | import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; |
... | ... | @@ -61,9 +69,11 @@ import org.thingsboard.server.common.stats.StatsType; |
61 | 69 | |
62 | 70 | import javax.annotation.PostConstruct; |
63 | 71 | import javax.annotation.PreDestroy; |
72 | +import java.util.Arrays; | |
64 | 73 | import java.util.Collections; |
65 | 74 | import java.util.List; |
66 | 75 | import java.util.Map; |
76 | +import java.util.Optional; | |
67 | 77 | import java.util.Random; |
68 | 78 | import java.util.UUID; |
69 | 79 | import java.util.concurrent.ConcurrentHashMap; |
... | ... | @@ -75,6 +85,7 @@ import java.util.concurrent.ScheduledExecutorService; |
75 | 85 | import java.util.concurrent.ScheduledFuture; |
76 | 86 | import java.util.concurrent.TimeUnit; |
77 | 87 | import java.util.concurrent.atomic.AtomicInteger; |
88 | +import java.util.function.Function; | |
78 | 89 | |
79 | 90 | /** |
80 | 91 | * Created by ashvayka on 17.10.18. |
... | ... | @@ -105,6 +116,8 @@ public class DefaultTransportService implements TransportService { |
105 | 116 | private final PartitionService partitionService; |
106 | 117 | private final TbServiceInfoProvider serviceInfoProvider; |
107 | 118 | private final StatsFactory statsFactory; |
119 | + private final DataDecodingEncodingService dataDecodingEncodingService; | |
120 | + | |
108 | 121 | |
109 | 122 | protected TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> transportApiRequestTemplate; |
110 | 123 | protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer; |
... | ... | @@ -120,19 +133,26 @@ public class DefaultTransportService implements TransportService { |
120 | 133 | |
121 | 134 | private final ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>(); |
122 | 135 | private final Map<String, RpcRequestMetadata> toServerRpcPendingMap = new ConcurrentHashMap<>(); |
123 | - //TODO: Implement cleanup of this maps. | |
136 | + //TODO 3.2: @ybondarenko Implement cleanup of this maps. | |
124 | 137 | private final ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>(); |
125 | 138 | private final ConcurrentMap<DeviceId, TbRateLimits> perDeviceLimits = new ConcurrentHashMap<>(); |
139 | + private final ConcurrentMap<DeviceProfileId, DeviceProfile> deviceProfiles = new ConcurrentHashMap<>(); | |
126 | 140 | |
127 | 141 | private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer")); |
128 | 142 | private volatile boolean stopped = false; |
129 | 143 | |
130 | - public DefaultTransportService(TbServiceInfoProvider serviceInfoProvider, TbTransportQueueFactory queueProvider, TbQueueProducerProvider producerProvider, PartitionService partitionService, StatsFactory statsFactory) { | |
144 | + public DefaultTransportService(TbServiceInfoProvider serviceInfoProvider, | |
145 | + TbTransportQueueFactory queueProvider, | |
146 | + TbQueueProducerProvider producerProvider, | |
147 | + PartitionService partitionService, | |
148 | + StatsFactory statsFactory, | |
149 | + DataDecodingEncodingService dataDecodingEncodingService) { | |
131 | 150 | this.serviceInfoProvider = serviceInfoProvider; |
132 | 151 | this.queueProvider = queueProvider; |
133 | 152 | this.producerProvider = producerProvider; |
134 | 153 | this.partitionService = partitionService; |
135 | 154 | this.statsFactory = statsFactory; |
155 | + this.dataDecodingEncodingService = dataDecodingEncodingService; | |
136 | 156 | } |
137 | 157 | |
138 | 158 | @PostConstruct |
... | ... | @@ -231,15 +251,22 @@ public class DefaultTransportService implements TransportService { |
231 | 251 | public void process(TransportProtos.ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback) { |
232 | 252 | log.trace("Processing msg: {}", msg); |
233 | 253 | TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()); |
234 | - AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg), | |
235 | - response -> callback.onSuccess(response.getValue().getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor); | |
254 | + process(callback, protoMsg); | |
236 | 255 | } |
237 | 256 | |
238 | 257 | @Override |
239 | 258 | public void process(TransportProtos.ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback) { |
240 | 259 | log.trace("Processing msg: {}", msg); |
241 | 260 | TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()); |
242 | - AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg), | |
261 | + process(callback, protoMsg); | |
262 | + } | |
263 | + | |
264 | + private void process(TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback, TbProtoQueueMsg<TransportApiRequestMsg> protoMsg) { | |
265 | + ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> result = extractProfile(transportApiRequestTemplate.send(protoMsg), | |
266 | + response -> response.getValidateTokenResponseMsg().hasDeviceInfo(), | |
267 | + response -> response.getValidateTokenResponseMsg().getDeviceInfo(), | |
268 | + response -> response.getValidateTokenResponseMsg().getProfileBody()); | |
269 | + AsyncCallbackTemplate.withCallback(result, | |
243 | 270 | response -> callback.onSuccess(response.getValue().getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor); |
244 | 271 | } |
245 | 272 | |
... | ... | @@ -247,7 +274,11 @@ public class DefaultTransportService implements TransportService { |
247 | 274 | public void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg msg, TransportServiceCallback<TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg> callback) { |
248 | 275 | log.trace("Processing msg: {}", msg); |
249 | 276 | TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(msg).build()); |
250 | - AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg), | |
277 | + ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> result = extractProfile(transportApiRequestTemplate.send(protoMsg), | |
278 | + response -> response.getGetOrCreateDeviceResponseMsg().hasDeviceInfo(), | |
279 | + response -> response.getGetOrCreateDeviceResponseMsg().getDeviceInfo(), | |
280 | + response -> response.getGetOrCreateDeviceResponseMsg().getProfileBody()); | |
281 | + AsyncCallbackTemplate.withCallback(result, | |
251 | 282 | response -> callback.onSuccess(response.getValue().getGetOrCreateDeviceResponseMsg()), callback::onError, transportCallbackExecutor); |
252 | 283 | } |
253 | 284 | |
... | ... | @@ -282,7 +313,9 @@ public class DefaultTransportService implements TransportService { |
282 | 313 | metaData.putValue("deviceType", sessionInfo.getDeviceType()); |
283 | 314 | metaData.putValue("ts", tsKv.getTs() + ""); |
284 | 315 | JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList()); |
285 | - TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, metaData, gson.toJson(json)); | |
316 | + RuleChainId ruleChainId = resolveRuleChainId(sessionInfo); | |
317 | + TbMsg tbMsg = TbMsg.newMsg(ServiceQueue.MAIN, SessionMsgType.POST_TELEMETRY_REQUEST.name(), | |
318 | + deviceId, metaData, gson.toJson(json), ruleChainId, null); | |
286 | 319 | sendToRuleEngine(tenantId, tbMsg, packCallback); |
287 | 320 | } |
288 | 321 | } |
... | ... | @@ -298,7 +331,9 @@ public class DefaultTransportService implements TransportService { |
298 | 331 | TbMsgMetaData metaData = new TbMsgMetaData(); |
299 | 332 | metaData.putValue("deviceName", sessionInfo.getDeviceName()); |
300 | 333 | metaData.putValue("deviceType", sessionInfo.getDeviceType()); |
301 | - TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, metaData, gson.toJson(json)); | |
334 | + RuleChainId ruleChainId = resolveRuleChainId(sessionInfo); | |
335 | + TbMsg tbMsg = TbMsg.newMsg(ServiceQueue.MAIN, SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), | |
336 | + deviceId, metaData, gson.toJson(json), ruleChainId, null); | |
302 | 337 | sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback)); |
303 | 338 | } |
304 | 339 | } |
... | ... | @@ -380,9 +415,10 @@ public class DefaultTransportService implements TransportService { |
380 | 415 | metaData.putValue("requestId", Integer.toString(msg.getRequestId())); |
381 | 416 | metaData.putValue("serviceId", serviceInfoProvider.getServiceId()); |
382 | 417 | metaData.putValue("sessionId", sessionId.toString()); |
383 | - TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, metaData, TbMsgDataType.JSON, gson.toJson(json)); | |
418 | + RuleChainId ruleChainId = resolveRuleChainId(sessionInfo); | |
419 | + TbMsg tbMsg = TbMsg.newMsg(ServiceQueue.MAIN, SessionMsgType.TO_SERVER_RPC_REQUEST.name(), | |
420 | + deviceId, metaData, gson.toJson(json), ruleChainId, null); | |
384 | 421 | sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback)); |
385 | - | |
386 | 422 | String requestId = sessionId + "-" + msg.getRequestId(); |
387 | 423 | toServerRpcPendingMap.put(requestId, new RpcRequestMetadata(sessionId, msg.getRequestId())); |
388 | 424 | schedulerExecutor.schedule(() -> processTimeout(requestId), clientSideRpcTimeout, TimeUnit.MILLISECONDS); |
... | ... | @@ -538,11 +574,62 @@ public class DefaultTransportService implements TransportService { |
538 | 574 | deregisterSession(md.getSessionInfo()); |
539 | 575 | } |
540 | 576 | } else { |
541 | - //TODO: should we notify the device actor about missed session? | |
542 | - log.debug("[{}] Missing session.", sessionId); | |
577 | + if (toSessionMsg.hasDeviceProfileUpdateMsg()) { | |
578 | + Optional<DeviceProfile> deviceProfile = dataDecodingEncodingService.decode(toSessionMsg.getDeviceProfileUpdateMsg().getData().toByteArray()); | |
579 | + deviceProfile.ifPresent(this::onProfileUpdate); | |
580 | + } else { | |
581 | + //TODO: should we notify the device actor about missed session? | |
582 | + log.debug("[{}] Missing session.", sessionId); | |
583 | + } | |
584 | + } | |
585 | + } | |
586 | + | |
587 | + @Override | |
588 | + public void getDeviceProfile(DeviceProfileId deviceProfileId, TransportServiceCallback<DeviceProfile> callback) { | |
589 | + DeviceProfile deviceProfile = deviceProfiles.get(deviceProfileId); | |
590 | + if (deviceProfile != null) { | |
591 | + callback.onSuccess(deviceProfile); | |
592 | + } else { | |
593 | + log.trace("Processing device profile request: [{}]", deviceProfileId); | |
594 | + TransportProtos.GetDeviceProfileRequestMsg msg = TransportProtos.GetDeviceProfileRequestMsg.newBuilder() | |
595 | + .setProfileIdMSB(deviceProfileId.getId().getMostSignificantBits()) | |
596 | + .setProfileIdLSB(deviceProfileId.getId().getLeastSignificantBits()) | |
597 | + .build(); | |
598 | + TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), | |
599 | + TransportApiRequestMsg.newBuilder().setGetDeviceProfileRequestMsg(msg).build()); | |
600 | + AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg), | |
601 | + response -> { | |
602 | + byte[] devProfileBody = response.getValue().getGetDeviceProfileResponseMsg().getData().toByteArray(); | |
603 | + if (devProfileBody != null && devProfileBody.length > 0) { | |
604 | + Optional<DeviceProfile> deviceProfileOpt = dataDecodingEncodingService.decode(devProfileBody); | |
605 | + if (deviceProfileOpt.isPresent()) { | |
606 | + deviceProfiles.put(deviceProfileOpt.get().getId(), deviceProfile); | |
607 | + callback.onSuccess(deviceProfileOpt.get()); | |
608 | + } else { | |
609 | + log.warn("Failed to decode device profile: {}", Arrays.toString(devProfileBody)); | |
610 | + callback.onError(new IllegalArgumentException("Failed to decode device profile!")); | |
611 | + } | |
612 | + } else { | |
613 | + log.warn("Failed to find device profile: [{}]", deviceProfileId); | |
614 | + callback.onError(new IllegalArgumentException("Failed to find device profile!")); | |
615 | + } | |
616 | + }, callback::onError, transportCallbackExecutor); | |
543 | 617 | } |
544 | 618 | } |
545 | 619 | |
620 | + @Override | |
621 | + public void onProfileUpdate(DeviceProfile deviceProfile) { | |
622 | + deviceProfiles.put(deviceProfile.getId(), deviceProfile); | |
623 | + long deviceProfileIdMSB = deviceProfile.getId().getId().getMostSignificantBits(); | |
624 | + long deviceProfileIdLSB = deviceProfile.getId().getId().getLeastSignificantBits(); | |
625 | + sessions.forEach((id, md) -> { | |
626 | + if (md.getSessionInfo().getDeviceProfileIdMSB() == deviceProfileIdMSB | |
627 | + && md.getSessionInfo().getDeviceProfileIdLSB() == deviceProfileIdLSB) { | |
628 | + transportCallbackExecutor.submit(() -> md.getListener().onProfileUpdate(deviceProfile)); | |
629 | + } | |
630 | + }); | |
631 | + } | |
632 | + | |
546 | 633 | protected UUID toSessionId(TransportProtos.SessionInfoProto sessionInfo) { |
547 | 634 | return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); |
548 | 635 | } |
... | ... | @@ -593,6 +680,40 @@ public class DefaultTransportService implements TransportService { |
593 | 680 | ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback); |
594 | 681 | } |
595 | 682 | |
683 | + private RuleChainId resolveRuleChainId(TransportProtos.SessionInfoProto sessionInfo) { | |
684 | + DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB())); | |
685 | + DeviceProfile deviceProfile = deviceProfiles.get(deviceProfileId); | |
686 | + RuleChainId ruleChainId; | |
687 | + if (deviceProfile == null) { | |
688 | + log.warn("[{}] Device profile is null!", deviceProfileId); | |
689 | + ruleChainId = null; | |
690 | + } else { | |
691 | + ruleChainId = deviceProfile.getDefaultRuleChainId(); | |
692 | + } | |
693 | + return ruleChainId; | |
694 | + } | |
695 | + | |
696 | + private <T extends com.google.protobuf.GeneratedMessageV3> ListenableFuture<TbProtoQueueMsg<T>> extractProfile(ListenableFuture<TbProtoQueueMsg<T>> send, | |
697 | + Function<T, Boolean> hasDeviceInfo, | |
698 | + Function<T, TransportProtos.DeviceInfoProto> deviceInfoF, | |
699 | + Function<T, ByteString> profileBodyF) { | |
700 | + return Futures.transform(send, response -> { | |
701 | + T value = response.getValue(); | |
702 | + if (hasDeviceInfo.apply(value)) { | |
703 | + TransportProtos.DeviceInfoProto deviceInfo = deviceInfoF.apply(value); | |
704 | + ByteString profileBody = profileBodyF.apply(value); | |
705 | + if (profileBody != null && !profileBody.isEmpty()) { | |
706 | + DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceInfo.getDeviceProfileIdMSB(), deviceInfo.getDeviceProfileIdLSB())); | |
707 | + if (!deviceProfiles.containsKey(deviceProfileId)) { | |
708 | + Optional<DeviceProfile> deviceProfile = dataDecodingEncodingService.decode(profileBody.toByteArray()); | |
709 | + deviceProfile.ifPresent(profile -> deviceProfiles.put(deviceProfileId, profile)); | |
710 | + } | |
711 | + } | |
712 | + } | |
713 | + return response; | |
714 | + }, transportCallbackExecutor); | |
715 | + } | |
716 | + | |
596 | 717 | private class TransportTbQueueCallback implements TbQueueCallback { |
597 | 718 | private final TransportServiceCallback<Void> callback; |
598 | 719 | ... | ... |
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/DataDecodingEncodingService.java
renamed from
application/src/main/java/org/thingsboard/server/service/encoding/DataDecodingEncodingService.java
... | ... | @@ -13,7 +13,7 @@ |
13 | 13 | * See the License for the specific language governing permissions and |
14 | 14 | * limitations under the License. |
15 | 15 | */ |
16 | -package org.thingsboard.server.service.encoding; | |
16 | +package org.thingsboard.server.common.transport.util; | |
17 | 17 | |
18 | 18 | import org.thingsboard.server.common.msg.TbActorMsg; |
19 | 19 | |
... | ... | @@ -21,9 +21,9 @@ import java.util.Optional; |
21 | 21 | |
22 | 22 | public interface DataDecodingEncodingService { |
23 | 23 | |
24 | - Optional<TbActorMsg> decode(byte[] byteArray); | |
24 | + <T> Optional<T> decode(byte[] byteArray); | |
25 | 25 | |
26 | - byte[] encode(TbActorMsg msq); | |
26 | + <T> byte[] encode(T msq); | |
27 | 27 | |
28 | 28 | } |
29 | 29 | ... | ... |
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/ProtoWithFSTService.java
renamed from
application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithFSTService.java
... | ... | @@ -13,12 +13,13 @@ |
13 | 13 | * See the License for the specific language governing permissions and |
14 | 14 | * limitations under the License. |
15 | 15 | */ |
16 | -package org.thingsboard.server.service.encoding; | |
16 | +package org.thingsboard.server.common.transport.util; | |
17 | 17 | |
18 | 18 | import lombok.extern.slf4j.Slf4j; |
19 | 19 | import org.nustaq.serialization.FSTConfiguration; |
20 | 20 | import org.springframework.stereotype.Service; |
21 | 21 | import org.thingsboard.server.common.msg.TbActorMsg; |
22 | +import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; | |
22 | 23 | |
23 | 24 | import java.util.Optional; |
24 | 25 | |
... | ... | @@ -29,11 +30,10 @@ public class ProtoWithFSTService implements DataDecodingEncodingService { |
29 | 30 | private final FSTConfiguration config = FSTConfiguration.createDefaultConfiguration(); |
30 | 31 | |
31 | 32 | @Override |
32 | - public Optional<TbActorMsg> decode(byte[] byteArray) { | |
33 | + public <T> Optional<T> decode(byte[] byteArray) { | |
33 | 34 | try { |
34 | - TbActorMsg msg = (TbActorMsg) config.asObject(byteArray); | |
35 | + T msg = (T) config.asObject(byteArray); | |
35 | 36 | return Optional.of(msg); |
36 | - | |
37 | 37 | } catch (IllegalArgumentException e) { |
38 | 38 | log.error("Error during deserialization message, [{}]", e.getMessage()); |
39 | 39 | return Optional.empty(); |
... | ... | @@ -41,7 +41,7 @@ public class ProtoWithFSTService implements DataDecodingEncodingService { |
41 | 41 | } |
42 | 42 | |
43 | 43 | @Override |
44 | - public byte[] encode(TbActorMsg msq) { | |
44 | + public <T> byte[] encode(T msq) { | |
45 | 45 | return config.asByteArray(msq); |
46 | 46 | } |
47 | 47 | ... | ... |
... | ... | @@ -26,6 +26,7 @@ import org.springframework.stereotype.Service; |
26 | 26 | import org.thingsboard.server.common.data.DeviceProfile; |
27 | 27 | import org.thingsboard.server.common.data.DeviceProfileInfo; |
28 | 28 | import org.thingsboard.server.common.data.DeviceProfileType; |
29 | +import org.thingsboard.server.common.data.DeviceTransportType; | |
29 | 30 | import org.thingsboard.server.common.data.Tenant; |
30 | 31 | import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileConfiguration; |
31 | 32 | import org.thingsboard.server.common.data.device.profile.DeviceProfileData; |
... | ... | @@ -166,6 +167,7 @@ public class DeviceProfileServiceImpl extends AbstractEntityService implements D |
166 | 167 | deviceProfile.setDefault(true); |
167 | 168 | deviceProfile.setName("Default"); |
168 | 169 | deviceProfile.setType(DeviceProfileType.DEFAULT); |
170 | + deviceProfile.setTransportType(DeviceTransportType.DEFAULT); | |
169 | 171 | deviceProfile.setDescription("Default device profile"); |
170 | 172 | DeviceProfileData deviceProfileData = new DeviceProfileData(); |
171 | 173 | DefaultDeviceProfileConfiguration configuration = new DefaultDeviceProfileConfiguration(); | ... | ... |
... | ... | @@ -41,8 +41,10 @@ import org.thingsboard.server.common.data.EntityView; |
41 | 41 | import org.thingsboard.server.common.data.Tenant; |
42 | 42 | import org.thingsboard.server.common.data.device.DeviceSearchQuery; |
43 | 43 | import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration; |
44 | +import org.thingsboard.server.common.data.device.data.DefaultDeviceTransportConfiguration; | |
44 | 45 | import org.thingsboard.server.common.data.device.data.DeviceData; |
45 | -import org.thingsboard.server.common.data.device.data.Lwm2mDeviceConfiguration; | |
46 | +import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration; | |
47 | +import org.thingsboard.server.common.data.device.data.MqttDeviceTransportConfiguration; | |
46 | 48 | import org.thingsboard.server.common.data.id.CustomerId; |
47 | 49 | import org.thingsboard.server.common.data.id.DeviceId; |
48 | 50 | import org.thingsboard.server.common.data.id.DeviceProfileId; |
... | ... | @@ -175,8 +177,14 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe |
175 | 177 | case DEFAULT: |
176 | 178 | deviceData.setConfiguration(new DefaultDeviceConfiguration()); |
177 | 179 | break; |
180 | + } | |
181 | + switch (deviceProfile.getTransportType()){ | |
182 | + case DEFAULT: | |
183 | + deviceData.setTransportConfiguration(new DefaultDeviceTransportConfiguration()); | |
184 | + case MQTT: | |
185 | + deviceData.setTransportConfiguration(new MqttDeviceTransportConfiguration()); | |
178 | 186 | case LWM2M: |
179 | - deviceData.setConfiguration(new Lwm2mDeviceConfiguration()); | |
187 | + deviceData.setTransportConfiguration(new Lwm2mDeviceTransportConfiguration()); | |
180 | 188 | break; |
181 | 189 | } |
182 | 190 | device.setDeviceData(deviceData); | ... | ... |
... | ... | @@ -168,6 +168,7 @@ public class ModelConstants { |
168 | 168 | public static final String DEVICE_PROFILE_TENANT_ID_PROPERTY = TENANT_ID_PROPERTY; |
169 | 169 | public static final String DEVICE_PROFILE_NAME_PROPERTY = "name"; |
170 | 170 | public static final String DEVICE_PROFILE_TYPE_PROPERTY = "type"; |
171 | + public static final String DEVICE_PROFILE_TRANSPORT_TYPE_PROPERTY = "transport_type"; | |
171 | 172 | public static final String DEVICE_PROFILE_PROFILE_DATA_PROPERTY = "profile_data"; |
172 | 173 | public static final String DEVICE_PROFILE_DESCRIPTION_PROPERTY = "description"; |
173 | 174 | public static final String DEVICE_PROFILE_IS_DEFAULT_PROPERTY = "is_default"; | ... | ... |
... | ... | @@ -23,6 +23,7 @@ import org.hibernate.annotations.Type; |
23 | 23 | import org.hibernate.annotations.TypeDef; |
24 | 24 | import org.thingsboard.server.common.data.DeviceProfile; |
25 | 25 | import org.thingsboard.server.common.data.DeviceProfileType; |
26 | +import org.thingsboard.server.common.data.DeviceTransportType; | |
26 | 27 | import org.thingsboard.server.common.data.device.profile.DeviceProfileData; |
27 | 28 | import org.thingsboard.server.common.data.id.DeviceProfileId; |
28 | 29 | import org.thingsboard.server.common.data.id.RuleChainId; |
... | ... | @@ -57,6 +58,10 @@ public final class DeviceProfileEntity extends BaseSqlEntity<DeviceProfile> impl |
57 | 58 | @Column(name = ModelConstants.DEVICE_PROFILE_TYPE_PROPERTY) |
58 | 59 | private DeviceProfileType type; |
59 | 60 | |
61 | + @Enumerated(EnumType.STRING) | |
62 | + @Column(name = ModelConstants.DEVICE_PROFILE_TRANSPORT_TYPE_PROPERTY) | |
63 | + private DeviceTransportType transportType; | |
64 | + | |
60 | 65 | @Column(name = ModelConstants.DEVICE_PROFILE_DESCRIPTION_PROPERTY) |
61 | 66 | private String description; |
62 | 67 | |
... | ... | @@ -87,6 +92,7 @@ public final class DeviceProfileEntity extends BaseSqlEntity<DeviceProfile> impl |
87 | 92 | this.setCreatedTime(deviceProfile.getCreatedTime()); |
88 | 93 | this.name = deviceProfile.getName(); |
89 | 94 | this.type = deviceProfile.getType(); |
95 | + this.transportType = deviceProfile.getTransportType(); | |
90 | 96 | this.description = deviceProfile.getDescription(); |
91 | 97 | this.isDefault = deviceProfile.isDefault(); |
92 | 98 | this.profileData = JacksonUtil.convertValue(deviceProfile.getProfileData(), ObjectNode.class); |
... | ... | @@ -118,6 +124,7 @@ public final class DeviceProfileEntity extends BaseSqlEntity<DeviceProfile> impl |
118 | 124 | } |
119 | 125 | deviceProfile.setName(name); |
120 | 126 | deviceProfile.setType(type); |
127 | + deviceProfile.setTransportType(transportType); | |
121 | 128 | deviceProfile.setDescription(description); |
122 | 129 | deviceProfile.setDefault(isDefault); |
123 | 130 | deviceProfile.setProfileData(JacksonUtil.convertValue(profileData, DeviceProfileData.class)); | ... | ... |
... | ... | @@ -27,7 +27,7 @@ import java.util.UUID; |
27 | 27 | |
28 | 28 | public interface DeviceProfileRepository extends PagingAndSortingRepository<DeviceProfileEntity, UUID> { |
29 | 29 | |
30 | - @Query("SELECT new org.thingsboard.server.common.data.DeviceProfileInfo(d.id, d.name, d.type) " + | |
30 | + @Query("SELECT new org.thingsboard.server.common.data.DeviceProfileInfo(d.id, d.name, d.type, d.transportType) " + | |
31 | 31 | "FROM DeviceProfileEntity d " + |
32 | 32 | "WHERE d.id = :deviceProfileId") |
33 | 33 | DeviceProfileInfo findDeviceProfileInfoById(@Param("deviceProfileId") UUID deviceProfileId); |
... | ... | @@ -38,7 +38,7 @@ public interface DeviceProfileRepository extends PagingAndSortingRepository<Devi |
38 | 38 | @Param("textSearch") String textSearch, |
39 | 39 | Pageable pageable); |
40 | 40 | |
41 | - @Query("SELECT new org.thingsboard.server.common.data.DeviceProfileInfo(d.id, d.name, d.type) " + | |
41 | + @Query("SELECT new org.thingsboard.server.common.data.DeviceProfileInfo(d.id, d.name, d.type, d.transportType) " + | |
42 | 42 | "FROM DeviceProfileEntity d WHERE " + |
43 | 43 | "d.tenantId = :tenantId AND LOWER(d.searchText) LIKE LOWER(CONCAT(:textSearch, '%'))") |
44 | 44 | Page<DeviceProfileInfo> findDeviceProfileInfos(@Param("tenantId") UUID tenantId, |
... | ... | @@ -49,7 +49,7 @@ public interface DeviceProfileRepository extends PagingAndSortingRepository<Devi |
49 | 49 | "WHERE d.tenantId = :tenantId AND d.isDefault = true") |
50 | 50 | DeviceProfileEntity findByDefaultTrueAndTenantId(@Param("tenantId") UUID tenantId); |
51 | 51 | |
52 | - @Query("SELECT new org.thingsboard.server.common.data.DeviceProfileInfo(d.id, d.name, d.type) " + | |
52 | + @Query("SELECT new org.thingsboard.server.common.data.DeviceProfileInfo(d.id, d.name, d.type, d.transportType) " + | |
53 | 53 | "FROM DeviceProfileEntity d " + |
54 | 54 | "WHERE d.tenantId = :tenantId AND d.isDefault = true") |
55 | 55 | DeviceProfileInfo findDefaultDeviceProfileInfo(@Param("tenantId") UUID tenantId); | ... | ... |
... | ... | @@ -139,12 +139,12 @@ CREATE TABLE IF NOT EXISTS dashboard ( |
139 | 139 | title varchar(255) |
140 | 140 | ); |
141 | 141 | |
142 | - | |
143 | 142 | CREATE TABLE IF NOT EXISTS device_profile ( |
144 | 143 | id uuid NOT NULL CONSTRAINT device_profile_pkey PRIMARY KEY, |
145 | 144 | created_time bigint NOT NULL, |
146 | 145 | name varchar(255), |
147 | 146 | type varchar(255), |
147 | + transport_type varchar(255), | |
148 | 148 | profile_data varchar, |
149 | 149 | description varchar, |
150 | 150 | search_text varchar(255), | ... | ... |
... | ... | @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.service; |
18 | 18 | import org.junit.After; |
19 | 19 | import org.junit.Assert; |
20 | 20 | import org.junit.Before; |
21 | +import org.junit.Ignore; | |
21 | 22 | import org.junit.Test; |
22 | 23 | import org.thingsboard.server.common.data.Device; |
23 | 24 | import org.thingsboard.server.common.data.DeviceProfile; |
... | ... | @@ -145,11 +146,13 @@ public class BaseDeviceProfileServiceTest extends AbstractServiceTest { |
145 | 146 | deviceProfileService.saveDeviceProfile(deviceProfile2); |
146 | 147 | } |
147 | 148 | |
149 | + @Ignore | |
148 | 150 | @Test(expected = DataValidationException.class) |
149 | 151 | public void testSaveSameDeviceProfileWithDifferentType() { |
150 | 152 | DeviceProfile deviceProfile = this.createDeviceProfile(tenantId,"Device Profile"); |
151 | 153 | DeviceProfile savedDeviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile); |
152 | - savedDeviceProfile.setType(DeviceProfileType.LWM2M); | |
154 | + //TODO: once we have mode profile types, we should test that we can not change profile type in runtime and uncomment the @Ignore. | |
155 | +// savedDeviceProfile.setType(DeviceProfileType.LWM2M); | |
153 | 156 | deviceProfileService.saveDeviceProfile(savedDeviceProfile); |
154 | 157 | } |
155 | 158 | |
... | ... | @@ -247,8 +250,9 @@ public class BaseDeviceProfileServiceTest extends AbstractServiceTest { |
247 | 250 | Collections.sort(deviceProfiles, idComparator); |
248 | 251 | Collections.sort(loadedDeviceProfileInfos, deviceProfileInfoIdComparator); |
249 | 252 | |
250 | - List<DeviceProfileInfo> deviceProfileInfos = deviceProfiles.stream().map(deviceProfile -> new DeviceProfileInfo(deviceProfile.getId(), | |
251 | - deviceProfile.getName(), deviceProfile.getType())).collect(Collectors.toList()); | |
253 | + List<DeviceProfileInfo> deviceProfileInfos = deviceProfiles.stream() | |
254 | + .map(deviceProfile -> new DeviceProfileInfo(deviceProfile.getId(), | |
255 | + deviceProfile.getName(), deviceProfile.getType(), deviceProfile.getTransportType())).collect(Collectors.toList()); | |
252 | 256 | |
253 | 257 | Assert.assertEquals(deviceProfileInfos, loadedDeviceProfileInfos); |
254 | 258 | ... | ... |