Commit 8468c33e14e7ff4a34d8f460f38dda5d331a2665

Authored by Igor Kulikov
2 parents e1afbe66 4ce9e096

Merge branch 'master' of github.com:thingsboard/thingsboard

Showing 15 changed files with 111 additions and 31 deletions
... ... @@ -67,8 +67,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
67 67 private static final int REQUEST_ID_POSITION_CERTIFICATE_REQUEST = 4;
68 68 private static final String DTLS_SESSION_ID_KEY = "DTLS_SESSION_ID";
69 69
70   - private final ConcurrentMap<TbCoapClientState, ObserveRelation> sessionInfoToObserveRelationMap = new ConcurrentHashMap<>();
71   -
72 70 private final ConcurrentMap<String, TbCoapDtlsSessionInfo> dtlsSessionIdMap;
73 71 private final long timeout;
74 72 private final CoapClientContext clients;
... ...
... ... @@ -49,4 +49,6 @@ public interface CoapTransportAdaptor {
49 49
50 50 ProvisionDeviceRequestMsg convertToProvisionRequestMsg(UUID sessionId, Request inbound) throws AdaptorException;
51 51
  52 + int getContentFormat();
  53 +
52 54 }
... ...
... ... @@ -23,6 +23,7 @@ import com.google.protobuf.Descriptors;
23 23 import com.google.protobuf.DynamicMessage;
24 24 import lombok.extern.slf4j.Slf4j;
25 25 import org.eclipse.californium.core.coap.CoAP;
  26 +import org.eclipse.californium.core.coap.MediaTypeRegistry;
26 27 import org.eclipse.californium.core.coap.Request;
27 28 import org.eclipse.californium.core.coap.Response;
28 29 import org.springframework.stereotype.Component;
... ... @@ -164,4 +165,9 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
164 165 return payload;
165 166 }
166 167
  168 + @Override
  169 + public int getContentFormat() {
  170 + return MediaTypeRegistry.APPLICATION_JSON;
  171 + }
  172 +
167 173 }
... ...
... ... @@ -23,6 +23,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
23 23 import com.google.protobuf.util.JsonFormat;
24 24 import lombok.extern.slf4j.Slf4j;
25 25 import org.eclipse.californium.core.coap.CoAP;
  26 +import org.eclipse.californium.core.coap.MediaTypeRegistry;
26 27 import org.eclipse.californium.core.coap.Request;
27 28 import org.eclipse.californium.core.coap.Response;
28 29 import org.springframework.stereotype.Component;
... ... @@ -162,4 +163,8 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor {
162 163 return JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage);
163 164 }
164 165
  166 + @Override
  167 + public int getContentFormat() {
  168 + return MediaTypeRegistry.APPLICATION_OCTET_STREAM;
  169 + }
165 170 }
... ...
... ... @@ -17,11 +17,14 @@ package org.thingsboard.server.transport.coap.callback;
17 17
18 18 import lombok.RequiredArgsConstructor;
19 19 import lombok.extern.slf4j.Slf4j;
  20 +import org.eclipse.californium.core.coap.MediaTypeRegistry;
20 21 import org.eclipse.californium.core.coap.Request;
  22 +import org.eclipse.californium.core.coap.Response;
21 23 import org.eclipse.californium.core.server.resources.CoapExchange;
22 24 import org.thingsboard.server.common.transport.SessionMsgListener;
23 25 import org.thingsboard.server.gen.transport.TransportProtos;
24 26 import org.thingsboard.server.transport.coap.client.TbCoapClientState;
  27 +import org.thingsboard.server.transport.coap.client.TbCoapContentFormatUtil;
25 28 import org.thingsboard.server.transport.coap.client.TbCoapObservationState;
26 29
27 30 import java.util.UUID;
... ... @@ -71,4 +74,9 @@ public abstract class AbstractSyncSessionCallback implements SessionMsgListener
71 74 }
72 75 }
73 76
  77 + protected void respond(Response response) {
  78 + response.getOptions().setContentFormat(TbCoapContentFormatUtil.getContentFormat(exchange.getRequestOptions().getContentFormat(), state.getContentFormat()));
  79 + exchange.respond(response);
  80 + }
  81 +
74 82 }
... ...
... ... @@ -34,7 +34,7 @@ public class GetAttributesSyncSessionCallback extends AbstractSyncSessionCallbac
34 34 @Override
35 35 public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg msg) {
36 36 try {
37   - exchange.respond(state.getAdaptor().convertToPublish(AbstractSyncSessionCallback.isConRequest(state.getAttrs()), msg));
  37 + respond(state.getAdaptor().convertToPublish(request.isConfirmable(), msg));
38 38 } catch (AdaptorException e) {
39 39 log.trace("[{}] Failed to reply due to error", state.getDeviceId(), e);
40 40 exchange.respond(new Response(CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
... ...
... ... @@ -33,7 +33,7 @@ public class ToServerRpcSyncSessionCallback extends AbstractSyncSessionCallback
33 33 @Override
34 34 public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) {
35 35 try {
36   - exchange.respond(state.getAdaptor().convertToPublish(isConRequest(state.getRpc()), toServerResponse));
  36 + respond(state.getAdaptor().convertToPublish(request.isConfirmable(), toServerResponse));
37 37 } catch (AdaptorException e) {
38 38 log.trace("Failed to reply due to error", e);
39 39 exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
... ...
... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.transport.coap.client;
18 18 import lombok.RequiredArgsConstructor;
19 19 import lombok.extern.slf4j.Slf4j;
20 20 import org.eclipse.californium.core.coap.CoAP;
  21 +import org.eclipse.californium.core.coap.MediaTypeRegistry;
21 22 import org.eclipse.californium.core.coap.Response;
22 23 import org.eclipse.californium.core.observe.ObserveRelation;
23 24 import org.eclipse.californium.core.server.resources.CoapExchange;
... ... @@ -328,8 +329,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
328 329 state.lock();
329 330 try {
330 331 if (state.getConfiguration() == null || state.getAdaptor() == null) {
331   - state.setConfiguration(getTransportConfigurationContainer(deviceProfile));
332   - state.setAdaptor(getCoapTransportAdaptor(state.getConfiguration().isJsonPayload()));
  332 + initStateAdaptor(deviceProfile, state);
333 333 }
334 334 if (state.getCredentials() == null) {
335 335 state.init(deviceCredentials);
... ... @@ -393,6 +393,12 @@ public class DefaultCoapClientContext implements CoapClientContext {
393 393 }
394 394 }
395 395
  396 + private void initStateAdaptor(DeviceProfile deviceProfile, TbCoapClientState state) throws AdaptorException {
  397 + state.setConfiguration(getTransportConfigurationContainer(deviceProfile));
  398 + state.setAdaptor(getCoapTransportAdaptor(state.getConfiguration().isJsonPayload()));
  399 + state.setContentFormat(state.getAdaptor().getContentFormat());
  400 + }
  401 +
396 402 private CoapTransportAdaptor getCoapTransportAdaptor(boolean jsonPayloadType) {
397 403 return jsonPayloadType ? transportContext.getJsonCoapAdaptor() : transportContext.getProtoCoapAdaptor();
398 404 }
... ... @@ -409,7 +415,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
409 415 try {
410 416 boolean conRequest = AbstractSyncSessionCallback.isConRequest(state.getAttrs());
411 417 Response response = state.getAdaptor().convertToPublish(conRequest, msg);
412   - attrs.getExchange().respond(response);
  418 + respond(attrs.getExchange(), response, state.getContentFormat());
413 419 } catch (AdaptorException e) {
414 420 log.trace("Failed to reply due to error", e);
415 421 cancelObserveRelation(attrs);
... ... @@ -440,10 +446,10 @@ public class DefaultCoapClientContext implements CoapClientContext {
440 446 int requestId = getNextMsgId();
441 447 Response response = state.getAdaptor().convertToPublish(conRequest, msg);
442 448 response.setMID(requestId);
443   - attrs.getExchange().respond(response);
444 449 if (conRequest) {
445 450 response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> awake(state), id -> asleep(state)));
446 451 }
  452 + respond(attrs.getExchange(), response, state.getContentFormat());
447 453 } catch (AdaptorException e) {
448 454 log.trace("[{}] Failed to reply due to error", state.getDeviceId(), e);
449 455 cancelObserveRelation(attrs);
... ... @@ -455,7 +461,23 @@ public class DefaultCoapClientContext implements CoapClientContext {
455 461 }
456 462
457 463 @Override
  464 + public void onDeviceProfileUpdate(TransportProtos.SessionInfoProto newSessionInfo, DeviceProfile deviceProfile) {
  465 + try {
  466 + initStateAdaptor(deviceProfile, state);
  467 + } catch (AdaptorException e) {
  468 + log.warn("[{}] Failed to update device profile: ", deviceProfile.getId(), e);
  469 + }
  470 + }
  471 +
  472 + @Override
458 473 public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
  474 + if (deviceProfileOpt.isPresent()) {
  475 + try {
  476 + initStateAdaptor(deviceProfileOpt.get(), state);
  477 + } catch (AdaptorException e) {
  478 + log.warn("[{}] Failed to update device: ", device.getId(), e);
  479 + }
  480 + }
459 481 state.onDeviceUpdate(device);
460 482 }
461 483
... ... @@ -503,7 +525,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
503 525 if (conRequest) {
504 526 response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> awake(state), id -> asleep(state)));
505 527 }
506   - state.getRpc().getExchange().respond(response);
  528 + respond(state.getRpc().getExchange(), response, state.getContentFormat());
507 529 sent = true;
508 530 } catch (AdaptorException e) {
509 531 log.trace("Failed to reply due to error", e);
... ... @@ -705,4 +727,9 @@ public class DefaultCoapClientContext implements CoapClientContext {
705 727 state.setAdaptor(null);
706 728 //TODO: add optimistic lock check that the client was already deleted and cleanup "clients" map.
707 729 }
  730 +
  731 + private void respond(CoapExchange exchange, Response response, int defContentFormat) {
  732 + response.getOptions().setContentFormat(TbCoapContentFormatUtil.getContentFormat(exchange.getRequestOptions().getContentFormat(), defContentFormat));
  733 + exchange.respond(response);
  734 + }
708 735 }
... ...
... ... @@ -18,26 +18,21 @@ package org.thingsboard.server.transport.coap.client;
18 18 import lombok.Data;
19 19 import lombok.Getter;
20 20 import lombok.Setter;
21   -import org.eclipse.leshan.server.registration.Registration;
22 21 import org.thingsboard.server.common.data.Device;
23 22 import org.thingsboard.server.common.data.DeviceTransportType;
24 23 import org.thingsboard.server.common.data.device.data.CoapDeviceTransportConfiguration;
25 24 import org.thingsboard.server.common.data.device.data.PowerMode;
26 25 import org.thingsboard.server.common.data.id.DeviceId;
27 26 import org.thingsboard.server.common.data.id.DeviceProfileId;
28   -import org.thingsboard.server.common.data.id.TenantId;
29 27 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
30 28 import org.thingsboard.server.gen.transport.TransportProtos;
31 29 import org.thingsboard.server.transport.coap.TransportConfigurationContainer;
32 30 import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
33 31
34   -import java.util.ArrayList;
35 32 import java.util.HashMap;
36 33 import java.util.HashSet;
37   -import java.util.List;
38 34 import java.util.Map;
39 35 import java.util.Set;
40   -import java.util.UUID;
41 36 import java.util.concurrent.Future;
42 37 import java.util.concurrent.locks.Lock;
43 38 import java.util.concurrent.locks.ReentrantLock;
... ... @@ -55,6 +50,8 @@ public class TbCoapClientState {
55 50 private volatile DefaultCoapClientContext.CoapSessionListener listener;
56 51 private volatile TbCoapObservationState attrs;
57 52 private volatile TbCoapObservationState rpc;
  53 + private volatile int contentFormat;
  54 +
58 55 private TransportProtos.AttributeUpdateNotificationMsg missedAttributeUpdates;
59 56
60 57 private DeviceProfileId profileId;
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.coap.client;
  17 +
  18 +import org.eclipse.californium.core.coap.MediaTypeRegistry;
  19 +
  20 +public class TbCoapContentFormatUtil {
  21 +
  22 + public static int getContentFormat(int requestFormat, int adaptorFormat) {
  23 + if (isStrict(adaptorFormat)) {
  24 + return adaptorFormat;
  25 + } else {
  26 + return requestFormat != MediaTypeRegistry.UNDEFINED ? requestFormat : adaptorFormat;
  27 + }
  28 + }
  29 +
  30 + public static boolean isStrict(int contentFormat) {
  31 + return contentFormat == MediaTypeRegistry.APPLICATION_OCTET_STREAM;
  32 + }
  33 +}
... ...
... ... @@ -137,17 +137,17 @@ public class LwM2mTransportCoapResource extends AbstractLwM2mTransportResource {
137 137 );
138 138 UUID currentId = UUID.fromString(idStr);
139 139 Response response = new Response(CoAP.ResponseCode.CONTENT);
140   - byte[] fwData = this.getOtaData(currentId);
141   - log.debug("Read softWare data (length): [{}]", fwData.length);
142   - if (fwData != null && fwData.length > 0) {
143   - response.setPayload(fwData);
  140 + byte[] otaData = this.getOtaData(currentId);
  141 + log.debug("Read ota data (length): [{}]", otaData.length);
  142 + if (otaData.length > 0) {
  143 + response.setPayload(otaData);
144 144 if (exchange.getRequestOptions().getBlock2() != null) {
145 145 int chunkSize = exchange.getRequestOptions().getBlock2().getSzx();
146   - boolean lastFlag = fwData.length <= chunkSize;
  146 + boolean lastFlag = otaData.length <= chunkSize;
147 147 response.getOptions().setBlock2(chunkSize, lastFlag, 0);
148   - log.trace("With block2 Send currentId: [{}], length: [{}], chunkSize [{}], moreFlag [{}]", currentId.toString(), fwData.length, chunkSize, lastFlag);
  148 + log.trace("With block2 Send currentId: [{}], length: [{}], chunkSize [{}], moreFlag [{}]", currentId.toString(), otaData.length, chunkSize, lastFlag);
149 149 } else {
150   - log.trace("With block1 Send currentId: [{}], length: [{}], ", currentId.toString(), fwData.length);
  150 + log.trace("With block1 Send currentId: [{}], length: [{}], ", currentId.toString(), otaData.length);
151 151 }
152 152 exchange.respond(response);
153 153 }
... ...
... ... @@ -317,7 +317,6 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
317 317 this.updateResourcesValue(lwM2MClient, lwM2mResource, path);
318 318 }
319 319 }
320   - clientContext.update(lwM2MClient);
321 320 if (clientContext.awake(lwM2MClient)) {
322 321 // clientContext.awake calls clientContext.update
323 322 log.debug("[{}] Device is awake", lwM2MClient.getEndpoint());
... ...
... ... @@ -39,10 +39,11 @@
39 39 <javax-annotation.version>1.3.2</javax-annotation.version>
40 40 <jakarta.xml.bind-api.version>2.3.2</jakarta.xml.bind-api.version>
41 41 <jaxb-runtime.version>2.3.2</jaxb-runtime.version>
42   - <spring-boot.version>2.3.9.RELEASE</spring-boot.version>
43   - <spring.version>5.2.10.RELEASE</spring.version>
  42 + <spring-boot.version>2.3.12.RELEASE</spring-boot.version>
  43 + <spring.version>5.2.16.RELEASE</spring.version>
  44 + <spring-redis.version>5.2.11.RELEASE</spring-redis.version>
44 45 <spring-security.version>5.4.1</spring-security.version>
45   - <spring-data-redis.version>2.4.1</spring-data-redis.version>
  46 + <spring-data-redis.version>2.4.3</spring-data-redis.version>
46 47 <jedis.version>3.3.0</jedis.version>
47 48 <jjwt.version>0.7.0</jjwt.version>
48 49 <json-path.version>2.2.0</json-path.version>
... ... @@ -62,7 +63,7 @@
62 63 <guava.version>28.2-jre</guava.version>
63 64 <caffeine.version>2.6.1</caffeine.version>
64 65 <commons-lang3.version>3.4</commons-lang3.version>
65   - <commons-io.version>2.5</commons-io.version>
  66 + <commons-io.version>2.11.0</commons-io.version>
66 67 <commons-csv.version>1.4</commons-csv.version>
67 68 <jackson.version>2.12.1</jackson.version>
68 69 <jackson-annotations.version>2.12.1</jackson-annotations.version>
... ... @@ -79,7 +80,7 @@
79 80 <grpc.version>1.38.0</grpc.version>
80 81 <lombok.version>1.18.18</lombok.version>
81 82 <paho.client.version>1.2.4</paho.client.version>
82   - <netty.version>4.1.60.Final</netty.version>
  83 + <netty.version>4.1.66.Final</netty.version>
83 84 <os-maven-plugin.version>1.7.0</os-maven-plugin.version>
84 85 <rabbitmq.version>4.8.0</rabbitmq.version>
85 86 <surfire.version>2.19.1</surfire.version>
... ... @@ -115,7 +116,7 @@
115 116 <micrometer.version>1.5.2</micrometer.version>
116 117 <protobuf-dynamic.version>1.0.3TB</protobuf-dynamic.version>
117 118 <wire-schema.version>3.4.0</wire-schema.version>
118   - <twilio.version>7.54.2</twilio.version>
  119 + <twilio.version>8.17.0</twilio.version>
119 120 <hibernate-validator.version>6.0.13.Final</hibernate-validator.version>
120 121 <javax.el.version>3.0.0</javax.el.version>
121 122 <javax.validation-api.version>2.0.1.Final</javax.validation-api.version>
... ... @@ -1504,7 +1505,7 @@
1504 1505 <dependency>
1505 1506 <groupId>org.springframework.integration</groupId>
1506 1507 <artifactId>spring-integration-redis</artifactId>
1507   - <version>${spring.version}</version>
  1508 + <version>${spring-redis.version}</version>
1508 1509 </dependency>
1509 1510 <dependency>
1510 1511 <groupId>redis.clients</groupId>
... ... @@ -1678,6 +1679,10 @@
1678 1679 <groupId>com.github.spotbugs</groupId>
1679 1680 <artifactId>spotbugs-annotations</artifactId>
1680 1681 </exclusion>
  1682 + <exclusion>
  1683 + <groupId>commons-io</groupId>
  1684 + <artifactId>commons-io</artifactId>
  1685 + </exclusion>
1681 1686 </exclusions>
1682 1687 </dependency>
1683 1688 <dependency>
... ...
... ... @@ -43,7 +43,7 @@ public class DictionaryParser {
43 43 return line.startsWith("COPY public.ts_kv_dictionary (");
44 44 }
45 45
46   - private void parseDictionaryDump(LineIterator iterator) {
  46 + private void parseDictionaryDump(LineIterator iterator) throws IOException {
47 47 try {
48 48 String tempLine;
49 49 while (iterator.hasNext()) {
... ...
... ... @@ -58,7 +58,7 @@ public class RelatedEntitiesParser {
58 58 return StringUtils.isBlank(line) || line.equals("\\.");
59 59 }
60 60
61   - private void processAllTables(LineIterator lineIterator) {
  61 + private void processAllTables(LineIterator lineIterator) throws IOException {
62 62 String currentLine;
63 63 try {
64 64 while (lineIterator.hasNext()) {
... ...