Commit 0245ff24a8151dc5a39452cde2402ec74b09a0c1

Authored by Andrii Shvaika
1 parent e67946e4

Transport Profile cache

  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.transport;
  17 +
  18 +import com.google.protobuf.ByteString;
  19 +import org.thingsboard.server.common.data.DeviceProfile;
  20 +import org.thingsboard.server.common.data.id.DeviceProfileId;
  21 +
  22 +import java.util.Optional;
  23 +
  24 +public interface TransportProfileCache {
  25 +
  26 +
  27 + DeviceProfile getOrCreate(DeviceProfileId id, ByteString profileBody);
  28 +
  29 + DeviceProfile get(DeviceProfileId id);
  30 +
  31 + void put(DeviceProfile profile);
  32 +
  33 + DeviceProfile put(ByteString profileBody);
  34 +
  35 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.transport.service;
  17 +
  18 +import com.google.protobuf.ByteString;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  21 +import org.springframework.stereotype.Component;
  22 +import org.thingsboard.server.common.data.DeviceProfile;
  23 +import org.thingsboard.server.common.data.id.DeviceProfileId;
  24 +import org.thingsboard.server.common.transport.TransportProfileCache;
  25 +import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
  26 +
  27 +import java.util.Optional;
  28 +import java.util.concurrent.ConcurrentHashMap;
  29 +import java.util.concurrent.ConcurrentMap;
  30 +
  31 +@Slf4j
  32 +@Component
  33 +@ConditionalOnExpression("('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'")
  34 +public class DefaultTransportProfileCache implements TransportProfileCache {
  35 +
  36 + private final ConcurrentMap<DeviceProfileId, DeviceProfile> deviceProfiles = new ConcurrentHashMap<>();
  37 +
  38 + private final DataDecodingEncodingService dataDecodingEncodingService;
  39 +
  40 + public DefaultTransportProfileCache(DataDecodingEncodingService dataDecodingEncodingService) {
  41 + this.dataDecodingEncodingService = dataDecodingEncodingService;
  42 + }
  43 +
  44 + @Override
  45 + public DeviceProfile getOrCreate(DeviceProfileId id, ByteString profileBody) {
  46 + DeviceProfile profile = deviceProfiles.get(id);
  47 + if (profile == null) {
  48 + Optional<DeviceProfile> deviceProfile = dataDecodingEncodingService.decode(profileBody.toByteArray());
  49 + if (deviceProfile.isPresent()) {
  50 + profile = deviceProfile.get();
  51 + deviceProfiles.put(id, profile);
  52 + }
  53 + }
  54 + return profile;
  55 + }
  56 +
  57 + @Override
  58 + public DeviceProfile get(DeviceProfileId id) {
  59 + return deviceProfiles.get(id);
  60 + }
  61 +
  62 + @Override
  63 + public void put(DeviceProfile profile) {
  64 + deviceProfiles.put(profile.getId(), profile);
  65 + }
  66 +
  67 + @Override
  68 + public DeviceProfile put(ByteString profileBody) {
  69 + Optional<DeviceProfile> deviceProfile = dataDecodingEncodingService.decode(profileBody.toByteArray());
  70 + if (deviceProfile.isPresent()) {
  71 + put(deviceProfile.get());
  72 + return deviceProfile.get();
  73 + } else {
  74 + return null;
  75 + }
  76 + }
  77 +}
... ...
... ... @@ -43,6 +43,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
43 43 import org.thingsboard.server.common.msg.tools.TbRateLimits;
44 44 import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
45 45 import org.thingsboard.server.common.transport.SessionMsgListener;
  46 +import org.thingsboard.server.common.transport.TransportProfileCache;
46 47 import org.thingsboard.server.common.transport.TransportService;
47 48 import org.thingsboard.server.common.transport.TransportServiceCallback;
48 49 import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
... ... @@ -121,8 +122,7 @@ public class DefaultTransportService implements TransportService {
121 122 private final PartitionService partitionService;
122 123 private final TbServiceInfoProvider serviceInfoProvider;
123 124 private final StatsFactory statsFactory;
124   - private final DataDecodingEncodingService dataDecodingEncodingService;
125   -
  125 + private final TransportProfileCache transportProfileCache;
126 126
127 127 protected TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> transportApiRequestTemplate;
128 128 protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;
... ... @@ -141,7 +141,6 @@ public class DefaultTransportService implements TransportService {
141 141 //TODO 3.2: @ybondarenko Implement cleanup of this maps.
142 142 private final ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>();
143 143 private final ConcurrentMap<DeviceId, TbRateLimits> perDeviceLimits = new ConcurrentHashMap<>();
144   - private final ConcurrentMap<DeviceProfileId, DeviceProfile> deviceProfiles = new ConcurrentHashMap<>();
145 144
146 145 private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer"));
147 146 private volatile boolean stopped = false;
... ... @@ -151,13 +150,13 @@ public class DefaultTransportService implements TransportService {
151 150 TbQueueProducerProvider producerProvider,
152 151 PartitionService partitionService,
153 152 StatsFactory statsFactory,
154   - DataDecodingEncodingService dataDecodingEncodingService) {
  153 + TransportProfileCache transportProfileCache) {
155 154 this.serviceInfoProvider = serviceInfoProvider;
156 155 this.queueProvider = queueProvider;
157 156 this.producerProvider = producerProvider;
158 157 this.partitionService = partitionService;
159 158 this.statsFactory = statsFactory;
160   - this.dataDecodingEncodingService = dataDecodingEncodingService;
  159 + this.transportProfileCache = transportProfileCache;
161 160 }
162 161
163 162 @PostConstruct
... ... @@ -276,14 +275,7 @@ public class DefaultTransportService implements TransportService {
276 275 result.deviceInfo(tdi);
277 276 ByteString profileBody = msg.getProfileBody();
278 277 if (profileBody != null && !profileBody.isEmpty()) {
279   - DeviceProfile profile = deviceProfiles.get(tdi.getDeviceProfileId());
280   - if (profile == null) {
281   - Optional<DeviceProfile> deviceProfile = dataDecodingEncodingService.decode(profileBody.toByteArray());
282   - if (deviceProfile.isPresent()) {
283   - profile = deviceProfile.get();
284   - deviceProfiles.put(tdi.getDeviceProfileId(), profile);
285   - }
286   - }
  278 + DeviceProfile profile = transportProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody);
287 279 if (transportType != DeviceTransportType.DEFAULT
288 280 && profile != null && profile.getTransportType() != DeviceTransportType.DEFAULT && profile.getTransportType() != transportType) {
289 281 log.debug("[{}] Device profile [{}] has different transport type: {}, expected: {}", tdi.getDeviceId(), tdi.getDeviceProfileId(), profile.getTransportType(), transportType);
... ... @@ -309,15 +301,7 @@ public class DefaultTransportService implements TransportService {
309 301 result.deviceInfo(tdi);
310 302 ByteString profileBody = msg.getProfileBody();
311 303 if (profileBody != null && !profileBody.isEmpty()) {
312   - DeviceProfile profile = deviceProfiles.get(tdi.getDeviceProfileId());
313   - if (profile == null) {
314   - Optional<DeviceProfile> deviceProfile = dataDecodingEncodingService.decode(profileBody.toByteArray());
315   - if (deviceProfile.isPresent()) {
316   - profile = deviceProfile.get();
317   - deviceProfiles.put(tdi.getDeviceProfileId(), profile);
318   - }
319   - }
320   - result.deviceProfile(profile);
  304 + result.deviceProfile(transportProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody));
321 305 }
322 306 }
323 307 return result.build();
... ... @@ -629,8 +613,10 @@ public class DefaultTransportService implements TransportService {
629 613 }
630 614 } else {
631 615 if (toSessionMsg.hasDeviceProfileUpdateMsg()) {
632   - Optional<DeviceProfile> deviceProfile = dataDecodingEncodingService.decode(toSessionMsg.getDeviceProfileUpdateMsg().getData().toByteArray());
633   - deviceProfile.ifPresent(this::onProfileUpdate);
  616 + DeviceProfile deviceProfile = transportProfileCache.put(toSessionMsg.getDeviceProfileUpdateMsg().getData());
  617 + if (deviceProfile != null) {
  618 + onProfileUpdate(deviceProfile);
  619 + }
634 620 } else {
635 621 //TODO: should we notify the device actor about missed session?
636 622 log.debug("[{}] Missing session.", sessionId);
... ... @@ -640,7 +626,7 @@ public class DefaultTransportService implements TransportService {
640 626
641 627 @Override
642 628 public void getDeviceProfile(DeviceProfileId deviceProfileId, TransportServiceCallback<DeviceProfile> callback) {
643   - DeviceProfile deviceProfile = deviceProfiles.get(deviceProfileId);
  629 + DeviceProfile deviceProfile = transportProfileCache.get(deviceProfileId);
644 630 if (deviceProfile != null) {
645 631 callback.onSuccess(deviceProfile);
646 632 } else {
... ... @@ -653,14 +639,13 @@ public class DefaultTransportService implements TransportService {
653 639 TransportApiRequestMsg.newBuilder().setGetDeviceProfileRequestMsg(msg).build());
654 640 AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg),
655 641 response -> {
656   - byte[] devProfileBody = response.getValue().getGetDeviceProfileResponseMsg().getData().toByteArray();
657   - if (devProfileBody != null && devProfileBody.length > 0) {
658   - Optional<DeviceProfile> deviceProfileOpt = dataDecodingEncodingService.decode(devProfileBody);
659   - if (deviceProfileOpt.isPresent()) {
660   - deviceProfiles.put(deviceProfileOpt.get().getId(), deviceProfile);
661   - callback.onSuccess(deviceProfileOpt.get());
  642 + ByteString devProfileBody = response.getValue().getGetDeviceProfileResponseMsg().getData();
  643 + if (devProfileBody != null && !devProfileBody.isEmpty()) {
  644 + DeviceProfile profile = transportProfileCache.put(devProfileBody);
  645 + if (profile != null) {
  646 + callback.onSuccess(profile);
662 647 } else {
663   - log.warn("Failed to decode device profile: {}", Arrays.toString(devProfileBody));
  648 + log.warn("Failed to decode device profile: {}", devProfileBody);
664 649 callback.onError(new IllegalArgumentException("Failed to decode device profile!"));
665 650 }
666 651 } else {
... ... @@ -673,7 +658,6 @@ public class DefaultTransportService implements TransportService {
673 658
674 659 @Override
675 660 public void onProfileUpdate(DeviceProfile deviceProfile) {
676   - deviceProfiles.put(deviceProfile.getId(), deviceProfile);
677 661 long deviceProfileIdMSB = deviceProfile.getId().getId().getMostSignificantBits();
678 662 long deviceProfileIdLSB = deviceProfile.getId().getId().getLeastSignificantBits();
679 663 sessions.forEach((id, md) -> {
... ... @@ -736,7 +720,7 @@ public class DefaultTransportService implements TransportService {
736 720
737 721 private RuleChainId resolveRuleChainId(TransportProtos.SessionInfoProto sessionInfo) {
738 722 DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB()));
739   - DeviceProfile deviceProfile = deviceProfiles.get(deviceProfileId);
  723 + DeviceProfile deviceProfile = transportProfileCache.get(deviceProfileId);
740 724 RuleChainId ruleChainId;
741 725 if (deviceProfile == null) {
742 726 log.warn("[{}] Device profile is null!", deviceProfileId);
... ... @@ -747,27 +731,6 @@ public class DefaultTransportService implements TransportService {
747 731 return ruleChainId;
748 732 }
749 733
750   - private <T extends com.google.protobuf.GeneratedMessageV3> ListenableFuture<TbProtoQueueMsg<T>> extractProfile(ListenableFuture<TbProtoQueueMsg<T>> send,
751   - Function<T, Boolean> hasDeviceInfo,
752   - Function<T, TransportProtos.DeviceInfoProto> deviceInfoF,
753   - Function<T, ByteString> profileBodyF) {
754   - return Futures.transform(send, response -> {
755   - T value = response.getValue();
756   - if (hasDeviceInfo.apply(value)) {
757   - TransportProtos.DeviceInfoProto deviceInfo = deviceInfoF.apply(value);
758   - ByteString profileBody = profileBodyF.apply(value);
759   - if (profileBody != null && !profileBody.isEmpty()) {
760   - DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceInfo.getDeviceProfileIdMSB(), deviceInfo.getDeviceProfileIdLSB()));
761   - if (!deviceProfiles.containsKey(deviceProfileId)) {
762   - Optional<DeviceProfile> deviceProfile = dataDecodingEncodingService.decode(profileBody.toByteArray());
763   - deviceProfile.ifPresent(profile -> deviceProfiles.put(deviceProfileId, profile));
764   - }
765   - }
766   - }
767   - return response;
768   - }, transportCallbackExecutor);
769   - }
770   -
771 734 private class TransportTbQueueCallback implements TbQueueCallback {
772 735 private final TransportServiceCallback<Void> callback;
773 736
... ...