Commit 16686afeae1865317de82d2b9862c61cf337addb

Authored by Igor Kulikov
1 parent ee774442

Add max string value lenght parameter for attributes/timeseries. Improve WS updates handling.

@@ -24,6 +24,7 @@ import com.google.gson.JsonElement; @@ -24,6 +24,7 @@ import com.google.gson.JsonElement;
24 import com.google.gson.JsonParser; 24 import com.google.gson.JsonParser;
25 import lombok.extern.slf4j.Slf4j; 25 import lombok.extern.slf4j.Slf4j;
26 import org.springframework.beans.factory.annotation.Autowired; 26 import org.springframework.beans.factory.annotation.Autowired;
  27 +import org.springframework.beans.factory.annotation.Value;
27 import org.springframework.http.HttpStatus; 28 import org.springframework.http.HttpStatus;
28 import org.springframework.http.ResponseEntity; 29 import org.springframework.http.ResponseEntity;
29 import org.springframework.security.access.prepost.PreAuthorize; 30 import org.springframework.security.access.prepost.PreAuthorize;
@@ -99,6 +100,9 @@ public class TelemetryController extends BaseController { @@ -99,6 +100,9 @@ public class TelemetryController extends BaseController {
99 @Autowired 100 @Autowired
100 private AccessValidator accessValidator; 101 private AccessValidator accessValidator;
101 102
  103 + @Value("${transport.json.max_string_value_length:0}")
  104 + private int maxStringValueLength;
  105 +
102 private ExecutorService executor; 106 private ExecutorService executor;
103 107
104 @PostConstruct 108 @PostConstruct
@@ -628,6 +632,10 @@ public class TelemetryController extends BaseController { @@ -628,6 +632,10 @@ public class TelemetryController extends BaseController {
628 String key = entry.getKey(); 632 String key = entry.getKey();
629 JsonNode value = entry.getValue(); 633 JsonNode value = entry.getValue();
630 if (entry.getValue().isTextual()) { 634 if (entry.getValue().isTextual()) {
  635 + if (maxStringValueLength > 0 && entry.getValue().textValue().length() > maxStringValueLength) {
  636 + String message = String.format("String value length [%d] for key [%s] is greater than maximum allowed [%d]", entry.getValue().textValue().length(), key, maxStringValueLength);
  637 + throw new UncheckedApiException(new InvalidParametersException(message));
  638 + }
631 attributes.add(new BaseAttributeKvEntry(new StringDataEntry(key, value.textValue()), ts)); 639 attributes.add(new BaseAttributeKvEntry(new StringDataEntry(key, value.textValue()), ts));
632 } else if (entry.getValue().isBoolean()) { 640 } else if (entry.getValue().isBoolean()) {
633 attributes.add(new BaseAttributeKvEntry(new BooleanDataEntry(key, value.booleanValue()), ts)); 641 attributes.add(new BaseAttributeKvEntry(new BooleanDataEntry(key, value.booleanValue()), ts));
@@ -200,7 +200,12 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr @@ -200,7 +200,12 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
200 } 200 }
201 } 201 }
202 synchronized (sessionMd) { 202 synchronized (sessionMd) {
  203 + long start = System.currentTimeMillis();
203 sessionMd.session.sendMessage(new TextMessage(msg)); 204 sessionMd.session.sendMessage(new TextMessage(msg));
  205 + long took = System.currentTimeMillis() - start;
  206 + if (took >= 1000) {
  207 + log.info("[{}][{}] Sending message took more than 1 second [{}ms] {}", sessionRef.getSecurityCtx().getTenantId(), externalId, took, msg);
  208 + }
204 } 209 }
205 } else { 210 } else {
206 log.warn("[{}][{}] Failed to find session by internal id", externalId, internalId); 211 log.warn("[{}][{}] Failed to find session by internal id", externalId, internalId);
@@ -581,13 +581,15 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -581,13 +581,15 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
581 } 581 }
582 582
583 private void sendWsMsg(TelemetryWebSocketSessionRef sessionRef, SubscriptionUpdate update) { 583 private void sendWsMsg(TelemetryWebSocketSessionRef sessionRef, SubscriptionUpdate update) {
584 - try {  
585 - msgEndpoint.send(sessionRef, update.getSubscriptionId(), jsonMapper.writeValueAsString(update));  
586 - } catch (JsonProcessingException e) {  
587 - log.warn("[{}] Failed to encode reply: {}", sessionRef.getSessionId(), update, e);  
588 - } catch (IOException e) {  
589 - log.warn("[{}] Failed to send reply: {}", sessionRef.getSessionId(), update, e);  
590 - } 584 + executor.submit(() -> {
  585 + try {
  586 + msgEndpoint.send(sessionRef, update.getSubscriptionId(), jsonMapper.writeValueAsString(update));
  587 + } catch (JsonProcessingException e) {
  588 + log.warn("[{}] Failed to encode reply: {}", sessionRef.getSessionId(), update, e);
  589 + } catch (IOException e) {
  590 + log.warn("[{}] Failed to send reply: {}", sessionRef.getSessionId(), update, e);
  591 + }
  592 + });
591 } 593 }
592 594
593 private static Optional<Set<String>> getKeys(TelemetryPluginCmd cmd) { 595 private static Optional<Set<String>> getKeys(TelemetryPluginCmd cmd) {
@@ -439,6 +439,8 @@ transport: @@ -439,6 +439,8 @@ transport:
439 json: 439 json:
440 # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON 440 # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
441 type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}" 441 type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"
  442 + # Maximum allowed string value length when processing Telemetry/Attributes JSON (0 value disables string value length check)
  443 + max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}"
442 # Local HTTP transport parameters 444 # Local HTTP transport parameters
443 http: 445 http:
444 enabled: "${HTTP_ENABLED:true}" 446 enabled: "${HTTP_ENABLED:true}"
@@ -61,6 +61,8 @@ public class JsonConverter { @@ -61,6 +61,8 @@ public class JsonConverter {
61 61
62 private static boolean isTypeCastEnabled = true; 62 private static boolean isTypeCastEnabled = true;
63 63
  64 + private static int maxStringValueLength = 0;
  65 +
64 public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonObject) throws JsonSyntaxException { 66 public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonObject) throws JsonSyntaxException {
65 long systemTs = System.currentTimeMillis(); 67 long systemTs = System.currentTimeMillis();
66 PostTelemetryMsg.Builder builder = PostTelemetryMsg.newBuilder(); 68 PostTelemetryMsg.Builder builder = PostTelemetryMsg.newBuilder();
@@ -131,6 +133,10 @@ public class JsonConverter { @@ -131,6 +133,10 @@ public class JsonConverter {
131 if (element.isJsonPrimitive()) { 133 if (element.isJsonPrimitive()) {
132 JsonPrimitive value = element.getAsJsonPrimitive(); 134 JsonPrimitive value = element.getAsJsonPrimitive();
133 if (value.isString()) { 135 if (value.isString()) {
  136 + if (maxStringValueLength > 0 && value.getAsString().length() > maxStringValueLength) {
  137 + String message = String.format("String value length [%d] for key [%s] is greater than maximum allowed [%d]", value.getAsString().length(), valueEntry.getKey(), maxStringValueLength);
  138 + throw new JsonSyntaxException(message);
  139 + }
134 if(isTypeCastEnabled && NumberUtils.isParsable(value.getAsString())) { 140 if(isTypeCastEnabled && NumberUtils.isParsable(value.getAsString())) {
135 try { 141 try {
136 result.add(buildNumericKeyValueProto(value, valueEntry.getKey())); 142 result.add(buildNumericKeyValueProto(value, valueEntry.getKey()));
@@ -389,6 +395,10 @@ public class JsonConverter { @@ -389,6 +395,10 @@ public class JsonConverter {
389 if (element.isJsonPrimitive()) { 395 if (element.isJsonPrimitive()) {
390 JsonPrimitive value = element.getAsJsonPrimitive(); 396 JsonPrimitive value = element.getAsJsonPrimitive();
391 if (value.isString()) { 397 if (value.isString()) {
  398 + if (maxStringValueLength > 0 && value.getAsString().length() > maxStringValueLength) {
  399 + String message = String.format("String value length [%d] for key [%s] is greater than maximum allowed [%d]", value.getAsString().length(), valueEntry.getKey(), maxStringValueLength);
  400 + throw new JsonSyntaxException(message);
  401 + }
392 if(isTypeCastEnabled && NumberUtils.isParsable(value.getAsString())) { 402 if(isTypeCastEnabled && NumberUtils.isParsable(value.getAsString())) {
393 try { 403 try {
394 parseNumericValue(result, valueEntry, value); 404 parseNumericValue(result, valueEntry, value);
@@ -456,4 +466,9 @@ public class JsonConverter { @@ -456,4 +466,9 @@ public class JsonConverter {
456 public static void setTypeCastEnabled(boolean enabled) { 466 public static void setTypeCastEnabled(boolean enabled) {
457 isTypeCastEnabled = enabled; 467 isTypeCastEnabled = enabled;
458 } 468 }
  469 +
  470 + public static void setMaxStringValueLength(int length) {
  471 + maxStringValueLength = length;
  472 + }
  473 +
459 } 474 }
@@ -28,4 +28,10 @@ public class JsonConverterConfig { @@ -28,4 +28,10 @@ public class JsonConverterConfig {
28 JsonConverter.setTypeCastEnabled(jsonTypeCastEnabled); 28 JsonConverter.setTypeCastEnabled(jsonTypeCastEnabled);
29 log.info("JSON type cast enabled = {}", jsonTypeCastEnabled); 29 log.info("JSON type cast enabled = {}", jsonTypeCastEnabled);
30 } 30 }
  31 +
  32 + @Value("${transport.json.max_string_value_length:0}")
  33 + public void setMaxStringValueLength(int maxStringValueLength) {
  34 + JsonConverter.setMaxStringValueLength(maxStringValueLength);
  35 + log.info("JSON max string value length = {}", maxStringValueLength);
  36 + }
31 } 37 }