Commit 71324536a30b11a1cdf46af48300686345107ca1

Authored by Andrii Shvaika
1 parent 6758a29c

Additional Kafka settings and logging of time consuming messages

... ... @@ -15,6 +15,8 @@
15 15 */
16 16 package org.thingsboard.server.service.queue;
17 17
  18 +import lombok.Getter;
  19 +import lombok.Setter;
18 20 import lombok.extern.slf4j.Slf4j;
19 21 import org.springframework.beans.factory.annotation.Value;
20 22 import org.springframework.boot.context.event.ApplicationReadyEvent;
... ... @@ -76,6 +78,7 @@ import java.util.concurrent.ConcurrentMap;
76 78 import java.util.concurrent.CountDownLatch;
77 79 import java.util.concurrent.ExecutorService;
78 80 import java.util.concurrent.Executors;
  81 +import java.util.concurrent.Future;
79 82 import java.util.concurrent.TimeUnit;
80 83 import java.util.function.Function;
81 84 import java.util.stream.Collectors;
... ... @@ -175,39 +178,48 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
175 178 CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
176 179 TbPackProcessingContext<TbProtoQueueMsg<ToCoreMsg>> ctx = new TbPackProcessingContext<>(
177 180 processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
178   - pendingMap.forEach((id, msg) -> {
179   - log.trace("[{}] Creating main callback for message: {}", id, msg.getValue());
180   - TbCallback callback = new TbPackCallback<>(id, ctx);
181   - try {
182   - ToCoreMsg toCoreMsg = msg.getValue();
183   - if (toCoreMsg.hasToSubscriptionMgrMsg()) {
184   - log.trace("[{}] Forwarding message to subscription manager service {}", id, toCoreMsg.getToSubscriptionMgrMsg());
185   - forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback);
186   - } else if (toCoreMsg.hasToDeviceActorMsg()) {
187   - log.trace("[{}] Forwarding message to device actor {}", id, toCoreMsg.getToDeviceActorMsg());
188   - forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
189   - } else if (toCoreMsg.hasDeviceStateServiceMsg()) {
190   - log.trace("[{}] Forwarding message to state service {}", id, toCoreMsg.getDeviceStateServiceMsg());
191   - forwardToStateService(toCoreMsg.getDeviceStateServiceMsg(), callback);
192   - } else if (toCoreMsg.getToDeviceActorNotificationMsg() != null && !toCoreMsg.getToDeviceActorNotificationMsg().isEmpty()) {
193   - Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray());
194   - if (actorMsg.isPresent()) {
195   - TbActorMsg tbActorMsg = actorMsg.get();
196   - if (tbActorMsg.getMsgType().equals(MsgType.DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG)) {
197   - tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor((ToDeviceRpcRequestActorMsg) tbActorMsg);
198   - } else {
199   - log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
200   - actorContext.tell(actorMsg.get());
  181 + PendingMsgHolder pendingMsgHolder = new PendingMsgHolder();
  182 + Future<?> packSubmitFuture = consumersExecutor.submit(() -> {
  183 + pendingMap.forEach((id, msg) -> {
  184 + log.trace("[{}] Creating main callback for message: {}", id, msg.getValue());
  185 + TbCallback callback = new TbPackCallback<>(id, ctx);
  186 + try {
  187 + ToCoreMsg toCoreMsg = msg.getValue();
  188 + pendingMsgHolder.setToCoreMsg(toCoreMsg);
  189 + if (toCoreMsg.hasToSubscriptionMgrMsg()) {
  190 + log.trace("[{}] Forwarding message to subscription manager service {}", id, toCoreMsg.getToSubscriptionMgrMsg());
  191 + forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback);
  192 + } else if (toCoreMsg.hasToDeviceActorMsg()) {
  193 + log.trace("[{}] Forwarding message to device actor {}", id, toCoreMsg.getToDeviceActorMsg());
  194 + forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
  195 + } else if (toCoreMsg.hasDeviceStateServiceMsg()) {
  196 + log.trace("[{}] Forwarding message to state service {}", id, toCoreMsg.getDeviceStateServiceMsg());
  197 + forwardToStateService(toCoreMsg.getDeviceStateServiceMsg(), callback);
  198 + } else if (toCoreMsg.getToDeviceActorNotificationMsg() != null && !toCoreMsg.getToDeviceActorNotificationMsg().isEmpty()) {
  199 + Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray());
  200 + if (actorMsg.isPresent()) {
  201 + TbActorMsg tbActorMsg = actorMsg.get();
  202 + if (tbActorMsg.getMsgType().equals(MsgType.DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG)) {
  203 + tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor((ToDeviceRpcRequestActorMsg) tbActorMsg);
  204 + } else {
  205 + log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
  206 + actorContext.tell(actorMsg.get());
  207 + }
201 208 }
  209 + callback.onSuccess();
202 210 }
203   - callback.onSuccess();
  211 + } catch (Throwable e) {
  212 + log.warn("[{}] Failed to process message: {}", id, msg, e);
  213 + callback.onFailure(e);
204 214 }
205   - } catch (Throwable e) {
206   - log.warn("[{}] Failed to process message: {}", id, msg, e);
207   - callback.onFailure(e);
208   - }
  215 + });
209 216 });
210 217 if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
  218 + if (!packSubmitFuture.isDone()) {
  219 + packSubmitFuture.cancel(true);
  220 + ToCoreMsg lastSubmitMsg = pendingMsgHolder.getToCoreMsg();
  221 + log.info("Timeout to process message: {}", lastSubmitMsg);
  222 + }
211 223 ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue()));
212 224 ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue()));
213 225 }
... ... @@ -227,6 +239,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
227 239 });
228 240 }
229 241
  242 + private static class PendingMsgHolder {
  243 + @Getter
  244 + @Setter
  245 + private volatile ToCoreMsg toCoreMsg;
  246 + }
  247 +
230 248 @Override
231 249 protected ServiceType getServiceType() {
232 250 return ServiceType.TB_CORE;
... ...
... ... @@ -591,6 +591,7 @@ queue:
591 591 linger.ms: "${TB_KAFKA_LINGER_MS:1}"
592 592 buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
593 593 replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}"
  594 + max_poll_interval_ms: "${TB_QUEUE_KAFKA_MAX_POLL_INTERVAL_MS:0}"
594 595 max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}"
595 596 max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}"
596 597 fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
... ...
... ... @@ -46,7 +46,6 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
46 46 private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
47 47 String clientId, String groupId, String topic,
48 48 boolean autoCommit, int autoCommitIntervalMs,
49   - int maxPollRecords,
50 49 TbQueueAdmin admin) {
51 50 super(topic);
52 51 Properties props = settings.toProps();
... ... @@ -54,6 +53,9 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
54 53 if (groupId != null) {
55 54 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
56 55 }
  56 + if (settings.getMaxPollIntervalMs() > 0) {
  57 + props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, settings.getMaxPollIntervalMs());
  58 + }
57 59 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, settings.getMaxPollRecords());
58 60 props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, settings.getMaxPartitionFetchBytes());
59 61 props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, settings.getFetchMaxBytes());
... ... @@ -61,9 +63,6 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
61 63 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
62 64 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
63 65 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
64   - if (maxPollRecords > 0) {
65   - props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
66   - }
67 66 this.admin = admin;
68 67 this.consumer = new KafkaConsumer<>(props);
69 68 this.decoder = decoder;
... ...
... ... @@ -63,6 +63,10 @@ public class TbKafkaSettings {
63 63 @Getter
64 64 private int maxPollRecords;
65 65
  66 + @Value("${queue.kafka.max_poll_interval_ms:0}")
  67 + @Getter
  68 + private int maxPollIntervalMs;
  69 +
66 70 @Value("${queue.kafka.max_partition_fetch_bytes:16777216}")
67 71 @Getter
68 72 private int maxPartitionFetchBytes;
... ... @@ -111,4 +115,5 @@ public class TbKafkaSettings {
111 115 }
112 116 return props;
113 117 }
  118 +
114 119 }
... ...