Commit b25ea1e6136b0a6b45b047f0ec5b663c3787ff42

Authored by Dima Landiak
Committed by Andrew Shvayka
1 parent 85b63c5b

rest api call node added parallel processing logic

@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j; @@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
23 import org.springframework.http.HttpEntity; 23 import org.springframework.http.HttpEntity;
24 import org.springframework.http.HttpHeaders; 24 import org.springframework.http.HttpHeaders;
25 import org.springframework.http.HttpMethod; 25 import org.springframework.http.HttpMethod;
  26 +import org.springframework.http.HttpStatus;
26 import org.springframework.http.ResponseEntity; 27 import org.springframework.http.ResponseEntity;
27 import org.springframework.http.client.Netty4ClientHttpRequestFactory; 28 import org.springframework.http.client.Netty4ClientHttpRequestFactory;
28 import org.springframework.util.concurrent.ListenableFuture; 29 import org.springframework.util.concurrent.ListenableFuture;
@@ -37,6 +38,8 @@ import org.thingsboard.server.common.msg.TbMsg; @@ -37,6 +38,8 @@ import org.thingsboard.server.common.msg.TbMsg;
37 import org.thingsboard.server.common.msg.TbMsgMetaData; 38 import org.thingsboard.server.common.msg.TbMsgMetaData;
38 39
39 import javax.net.ssl.SSLException; 40 import javax.net.ssl.SSLException;
  41 +import java.util.Deque;
  42 +import java.util.concurrent.ConcurrentLinkedDeque;
40 import java.util.concurrent.TimeUnit; 43 import java.util.concurrent.TimeUnit;
41 44
42 @Data 45 @Data
@@ -50,21 +53,24 @@ class TbHttpClient { @@ -50,21 +53,24 @@ class TbHttpClient {
50 private static final String ERROR_BODY = "error_body"; 53 private static final String ERROR_BODY = "error_body";
51 54
52 private final TbRestApiCallNodeConfiguration config; 55 private final TbRestApiCallNodeConfiguration config;
53 - private final boolean useRedisQueueForMsgPersistence;  
54 56
55 private EventLoopGroup eventLoopGroup; 57 private EventLoopGroup eventLoopGroup;
56 private AsyncRestTemplate httpClient; 58 private AsyncRestTemplate httpClient;
  59 + private Deque<ListenableFuture<ResponseEntity<String>>> pendingFutures;
57 60
58 TbHttpClient(TbRestApiCallNodeConfiguration config) throws TbNodeException { 61 TbHttpClient(TbRestApiCallNodeConfiguration config) throws TbNodeException {
59 try { 62 try {
60 this.config = config; 63 this.config = config;
61 - this.useRedisQueueForMsgPersistence = config.isUseRedisQueueForMsgPersistence(); 64 + if (config.getMaxParallelRequestsCount() > 0) {
  65 + pendingFutures = new ConcurrentLinkedDeque<>();
  66 + }
62 if (config.isUseSimpleClientHttpFactory()) { 67 if (config.isUseSimpleClientHttpFactory()) {
63 httpClient = new AsyncRestTemplate(); 68 httpClient = new AsyncRestTemplate();
64 } else { 69 } else {
65 this.eventLoopGroup = new NioEventLoopGroup(); 70 this.eventLoopGroup = new NioEventLoopGroup();
66 Netty4ClientHttpRequestFactory nettyFactory = new Netty4ClientHttpRequestFactory(this.eventLoopGroup); 71 Netty4ClientHttpRequestFactory nettyFactory = new Netty4ClientHttpRequestFactory(this.eventLoopGroup);
67 nettyFactory.setSslContext(SslContextBuilder.forClient().build()); 72 nettyFactory.setSslContext(SslContextBuilder.forClient().build());
  73 + nettyFactory.setReadTimeout(config.getReadTimeoutMs());
68 httpClient = new AsyncRestTemplate(nettyFactory); 74 httpClient = new AsyncRestTemplate(nettyFactory);
69 } 75 }
70 } catch (SSLException e) { 76 } catch (SSLException e) {
@@ -89,8 +95,12 @@ class TbHttpClient { @@ -89,8 +95,12 @@ class TbHttpClient {
89 future.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() { 95 future.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
90 @Override 96 @Override
91 public void onFailure(Throwable throwable) { 97 public void onFailure(Throwable throwable) {
92 - if (useRedisQueueForMsgPersistence) {  
93 - queueProcessor.pushOnFailure(msg); 98 + if (config.isUseRedisQueueForMsgPersistence()) {
  99 + if (throwable instanceof HttpClientErrorException) {
  100 + processHttpClientError(((HttpClientErrorException) throwable).getStatusCode(), msg, queueProcessor);
  101 + } else {
  102 + queueProcessor.pushOnFailure(msg);
  103 + }
94 } 104 }
95 TbMsg next = processException(ctx, msg, throwable); 105 TbMsg next = processException(ctx, msg, throwable);
96 ctx.tellFailure(next, throwable); 106 ctx.tellFailure(next, throwable);
@@ -99,20 +109,23 @@ class TbHttpClient { @@ -99,20 +109,23 @@ class TbHttpClient {
99 @Override 109 @Override
100 public void onSuccess(ResponseEntity<String> responseEntity) { 110 public void onSuccess(ResponseEntity<String> responseEntity) {
101 if (responseEntity.getStatusCode().is2xxSuccessful()) { 111 if (responseEntity.getStatusCode().is2xxSuccessful()) {
102 - if (useRedisQueueForMsgPersistence) { 112 + if (config.isUseRedisQueueForMsgPersistence()) {
103 queueProcessor.resetCounter(); 113 queueProcessor.resetCounter();
104 } 114 }
105 TbMsg next = processResponse(ctx, msg, responseEntity); 115 TbMsg next = processResponse(ctx, msg, responseEntity);
106 ctx.tellNext(next, TbRelationTypes.SUCCESS); 116 ctx.tellNext(next, TbRelationTypes.SUCCESS);
107 } else { 117 } else {
108 - if (useRedisQueueForMsgPersistence) {  
109 - queueProcessor.pushOnFailure(msg); 118 + if (config.isUseRedisQueueForMsgPersistence()) {
  119 + processHttpClientError(responseEntity.getStatusCode(), msg, queueProcessor);
110 } 120 }
111 TbMsg next = processFailureResponse(ctx, msg, responseEntity); 121 TbMsg next = processFailureResponse(ctx, msg, responseEntity);
112 ctx.tellNext(next, TbRelationTypes.FAILURE); 122 ctx.tellNext(next, TbRelationTypes.FAILURE);
113 } 123 }
114 } 124 }
115 }); 125 });
  126 + if (pendingFutures != null) {
  127 + processParallelRequests(future);
  128 + }
116 } 129 }
117 130
118 private TbMsg processResponse(TbContext ctx, TbMsg origMsg, ResponseEntity<String> response) { 131 private TbMsg processResponse(TbContext ctx, TbMsg origMsg, ResponseEntity<String> response) {
@@ -150,4 +163,31 @@ class TbHttpClient { @@ -150,4 +163,31 @@ class TbHttpClient {
150 config.getHeaders().forEach((k, v) -> headers.add(TbNodeUtils.processPattern(k, metaData), TbNodeUtils.processPattern(v, metaData))); 163 config.getHeaders().forEach((k, v) -> headers.add(TbNodeUtils.processPattern(k, metaData), TbNodeUtils.processPattern(v, metaData)));
151 return headers; 164 return headers;
152 } 165 }
  166 +
  167 + private void processParallelRequests(ListenableFuture<ResponseEntity<String>> future) {
  168 + pendingFutures.add(future);
  169 + if (pendingFutures.size() > config.getMaxParallelRequestsCount()) {
  170 + for (int i = 0; i < config.getMaxParallelRequestsCount(); i++) {
  171 + try {
  172 + ListenableFuture<ResponseEntity<String>> pendingFuture = pendingFutures.removeFirst();
  173 + try {
  174 + pendingFuture.get(config.getReadTimeoutMs(), TimeUnit.MILLISECONDS);
  175 + } catch (Exception e) {
  176 + log.warn("Timeout during waiting for reply!", e);
  177 + pendingFuture.cancel(true);
  178 + }
  179 + } catch (Exception e) {
  180 + log.warn("Failure during waiting for reply!", e);
  181 + }
  182 + }
  183 + }
  184 + }
  185 +
  186 + private void processHttpClientError(HttpStatus statusCode, TbMsg msg, TbRedisQueueProcessor queueProcessor) {
  187 + if (statusCode.is4xxClientError()) {
  188 + log.warn("[{}] Client error during message delivering!", msg);
  189 + } else {
  190 + queueProcessor.pushOnFailure(msg);
  191 + }
  192 + }
153 } 193 }
@@ -28,6 +28,8 @@ public class TbRestApiCallNodeConfiguration implements NodeConfiguration<TbRestA @@ -28,6 +28,8 @@ public class TbRestApiCallNodeConfiguration implements NodeConfiguration<TbRestA
28 private String requestMethod; 28 private String requestMethod;
29 private Map<String, String> headers; 29 private Map<String, String> headers;
30 private boolean useSimpleClientHttpFactory; 30 private boolean useSimpleClientHttpFactory;
  31 + private int readTimeoutMs;
  32 + private int maxParallelRequestsCount;
31 private boolean useRedisQueueForMsgPersistence; 33 private boolean useRedisQueueForMsgPersistence;
32 private boolean trimQueue; 34 private boolean trimQueue;
33 private int maxQueueSize; 35 private int maxQueueSize;
@@ -39,6 +41,8 @@ public class TbRestApiCallNodeConfiguration implements NodeConfiguration<TbRestA @@ -39,6 +41,8 @@ public class TbRestApiCallNodeConfiguration implements NodeConfiguration<TbRestA
39 configuration.setRequestMethod("POST"); 41 configuration.setRequestMethod("POST");
40 configuration.setHeaders(Collections.emptyMap()); 42 configuration.setHeaders(Collections.emptyMap());
41 configuration.setUseSimpleClientHttpFactory(false); 43 configuration.setUseSimpleClientHttpFactory(false);
  44 + configuration.setReadTimeoutMs(0);
  45 + configuration.setMaxParallelRequestsCount(0);
42 configuration.setUseRedisQueueForMsgPersistence(false); 46 configuration.setUseRedisQueueForMsgPersistence(false);
43 configuration.setTrimQueue(false); 47 configuration.setTrimQueue(false);
44 return configuration; 48 return configuration;