Commit 69a77792530de72ab27a437def806702d032df75

Authored by Sergey Matvienko
Committed by Andrew Shvayka
1 parent bc7cf029

added MqttTransportHandlerTest

  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.mqtt;
  17 +
  18 +import io.netty.buffer.ByteBuf;
  19 +import io.netty.buffer.EmptyByteBuf;
  20 +import io.netty.buffer.PooledByteBufAllocator;
  21 +import io.netty.channel.ChannelHandlerContext;
  22 +import io.netty.handler.codec.mqtt.MqttConnectMessage;
  23 +import io.netty.handler.codec.mqtt.MqttConnectPayload;
  24 +import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
  25 +import io.netty.handler.codec.mqtt.MqttFixedHeader;
  26 +import io.netty.handler.codec.mqtt.MqttMessage;
  27 +import io.netty.handler.codec.mqtt.MqttMessageType;
  28 +import io.netty.handler.codec.mqtt.MqttPublishMessage;
  29 +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
  30 +import io.netty.handler.codec.mqtt.MqttQoS;
  31 +import io.netty.handler.ssl.SslHandler;
  32 +import lombok.extern.slf4j.Slf4j;
  33 +import org.junit.After;
  34 +import org.junit.Before;
  35 +import org.junit.Test;
  36 +import org.junit.runner.RunWith;
  37 +import org.mockito.Mock;
  38 +import org.mockito.junit.MockitoJUnitRunner;
  39 +import org.thingsboard.common.util.ThingsBoardThreadFactory;
  40 +
  41 +import java.net.InetSocketAddress;
  42 +import java.nio.charset.StandardCharsets;
  43 +import java.util.List;
  44 +import java.util.concurrent.CountDownLatch;
  45 +import java.util.concurrent.ExecutorService;
  46 +import java.util.concurrent.Executors;
  47 +import java.util.concurrent.TimeUnit;
  48 +import java.util.concurrent.atomic.AtomicInteger;
  49 +import java.util.stream.Collectors;
  50 +import java.util.stream.Stream;
  51 +
  52 +import static org.hamcrest.MatcherAssert.assertThat;
  53 +import static org.hamcrest.Matchers.contains;
  54 +import static org.hamcrest.Matchers.empty;
  55 +import static org.hamcrest.Matchers.greaterThan;
  56 +import static org.hamcrest.Matchers.is;
  57 +import static org.junit.Assert.fail;
  58 +import static org.mockito.ArgumentMatchers.any;
  59 +import static org.mockito.BDDMockito.willDoNothing;
  60 +import static org.mockito.BDDMockito.willReturn;
  61 +import static org.mockito.Mockito.never;
  62 +import static org.mockito.Mockito.spy;
  63 +import static org.mockito.Mockito.times;
  64 +import static org.mockito.Mockito.verify;
  65 +
  66 +@Slf4j
  67 +@RunWith(MockitoJUnitRunner.class)
  68 +public class MqttTransportHandlerTest {
  69 +
  70 + public static final int MSG_QUEUE_LIMIT = 10;
  71 + public static final InetSocketAddress IP_ADDR = new InetSocketAddress("127.0.0.1", 9876);
  72 + public static final int TIMEOUT = 30;
  73 +
  74 + @Mock
  75 + MqttTransportContext context;
  76 + @Mock
  77 + SslHandler sslHandler;
  78 + @Mock
  79 + ChannelHandlerContext ctx;
  80 +
  81 + AtomicInteger packedId = new AtomicInteger();
  82 + ExecutorService executor;
  83 + MqttTransportHandler handler;
  84 +
  85 + @Before
  86 + public void setUp() throws Exception {
  87 + willReturn(MSG_QUEUE_LIMIT).given(context).getMessageQueueSizePerDeviceLimit();
  88 +
  89 + handler = spy(new MqttTransportHandler(context, sslHandler));
  90 + willReturn(IP_ADDR).given(handler).getAddress(any());
  91 + }
  92 +
  93 + @After
  94 + public void tearDown() {
  95 + if (executor != null) {
  96 + executor.shutdownNow();
  97 + }
  98 + }
  99 +
  100 + @Test
  101 + public void givenMessageWithoutFixedHeader_whenProcessMqttMsg_thenProcessDisconnect() {
  102 + MqttFixedHeader mqttFixedHeader = null;
  103 + MqttMessage msg = new MqttMessage(mqttFixedHeader);
  104 + willDoNothing().given(handler).processDisconnect(ctx);
  105 +
  106 + handler.processMqttMsg(ctx, msg);
  107 +
  108 + assertThat(handler.address, is(IP_ADDR));
  109 + verify(handler, times(1)).processDisconnect(ctx);
  110 + }
  111 +
  112 + MqttConnectMessage getMqttConnectMessage() {
  113 + MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, true, MqttQoS.AT_LEAST_ONCE, false, 123);
  114 + MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader("device", packedId.incrementAndGet(), true, true, true, 1, true, false, 60);
  115 + MqttConnectPayload payload = new MqttConnectPayload("clientId", "topic", "message".getBytes(StandardCharsets.UTF_8), "username", "password".getBytes(StandardCharsets.UTF_8));
  116 + return new MqttConnectMessage(mqttFixedHeader, variableHeader, payload);
  117 + }
  118 +
  119 + MqttPublishMessage getMqttPublishMessage() {
  120 + MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, true, MqttQoS.AT_LEAST_ONCE, false, 123);
  121 + MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("v1/gateway/telemetry", packedId.incrementAndGet());
  122 + ByteBuf payload = new EmptyByteBuf(new PooledByteBufAllocator());
  123 + return new MqttPublishMessage(mqttFixedHeader, variableHeader, payload);
  124 + }
  125 +
  126 + @Test
  127 + public void givenMqttConnectMessage_whenProcessMqttMsg_thenProcessConnect() {
  128 + MqttConnectMessage msg = getMqttConnectMessage();
  129 + willDoNothing().given(handler).processConnect(ctx, msg);
  130 +
  131 + handler.processMqttMsg(ctx, msg);
  132 +
  133 + assertThat(handler.address, is(IP_ADDR));
  134 + assertThat(handler.deviceSessionCtx.getChannel(), is(ctx));
  135 + verify(handler, never()).processDisconnect(any());
  136 + verify(handler, times(1)).processConnect(ctx, msg);
  137 + }
  138 +
  139 + @Test
  140 + public void givenQueueLimit_whenEnqueueRegularSessionMsgOverLimit_thenOK() {
  141 + List<MqttPublishMessage> messages = Stream.generate(this::getMqttPublishMessage).limit(MSG_QUEUE_LIMIT).collect(Collectors.toList());
  142 + messages.forEach(msg -> handler.enqueueRegularSessionMsg(ctx, msg));
  143 + assertThat(handler.deviceSessionCtx.getMsgQueueSize().get(), is(MSG_QUEUE_LIMIT));
  144 + assertThat(handler.deviceSessionCtx.getMsgQueue(), contains(messages.toArray()));
  145 + }
  146 +
  147 + @Test
  148 + public void givenQueueLimit_whenEnqueueRegularSessionMsgOverLimit_thenCtxClose() {
  149 + final int limit = MSG_QUEUE_LIMIT + 1;
  150 + willDoNothing().given(handler).processMsgQueue(ctx);
  151 + List<MqttPublishMessage> messages = Stream.generate(this::getMqttPublishMessage).limit(limit).collect(Collectors.toList());
  152 +
  153 + messages.forEach((msg) -> handler.enqueueRegularSessionMsg(ctx, msg));
  154 +
  155 + assertThat(handler.deviceSessionCtx.getMsgQueueSize().get(), is(limit));
  156 + verify(handler, times(limit)).enqueueRegularSessionMsg(any(), any());
  157 + verify(handler, times(MSG_QUEUE_LIMIT)).processMsgQueue(any());
  158 + verify(ctx, times(1)).close();
  159 + }
  160 +
  161 + @Test
  162 + public void givenMqttConnectMessageAndPublishImmediately_whenProcessMqttMsg_thenEnqueueRegularSessionMsg() {
  163 + givenMqttConnectMessage_whenProcessMqttMsg_thenProcessConnect();
  164 +
  165 + List<MqttPublishMessage> messages = Stream.generate(this::getMqttPublishMessage).limit(MSG_QUEUE_LIMIT).collect(Collectors.toList());
  166 +
  167 + messages.forEach((msg) -> handler.processMqttMsg(ctx, msg));
  168 +
  169 + assertThat(handler.address, is(IP_ADDR));
  170 + assertThat(handler.deviceSessionCtx.getChannel(), is(ctx));
  171 + assertThat(handler.deviceSessionCtx.getMsgQueueSize().get(), is(MSG_QUEUE_LIMIT));
  172 + assertThat(handler.deviceSessionCtx.getMsgQueue(), contains(messages.toArray()));
  173 + verify(handler, never()).processDisconnect(any());
  174 + verify(handler, times(1)).processConnect(any(), any());
  175 + verify(handler, times(MSG_QUEUE_LIMIT)).enqueueRegularSessionMsg(any(), any());
  176 + messages.forEach((msg) -> verify(handler, times(1)).enqueueRegularSessionMsg(ctx, msg));
  177 + }
  178 +
  179 + @Test
  180 + public void givenMessageQueue_whenProcessMqttMsg_thenEnqueueRegularSessionMsg() throws InterruptedException {
  181 + //given
  182 + assertThat(handler.deviceSessionCtx.isConnected(), is(false));
  183 + assertThat(MSG_QUEUE_LIMIT, greaterThan(2));
  184 + List<MqttPublishMessage> messages = Stream.generate(this::getMqttPublishMessage).limit(MSG_QUEUE_LIMIT).collect(Collectors.toList());
  185 + messages.forEach((msg) -> handler.enqueueRegularSessionMsg(ctx, msg));
  186 + willDoNothing().given(handler).processRegularSessionMsg(any(), any());
  187 + executor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName(getClass().getName()));
  188 +
  189 + CountDownLatch readyLatch = new CountDownLatch(MSG_QUEUE_LIMIT);
  190 + CountDownLatch startLatch = new CountDownLatch(1);
  191 + CountDownLatch finishLatch = new CountDownLatch(MSG_QUEUE_LIMIT);
  192 +
  193 + Stream.iterate(0, i -> i + 1).limit(MSG_QUEUE_LIMIT).forEach(x ->
  194 + executor.submit(() -> {
  195 + try {
  196 + readyLatch.countDown();
  197 + assertThat(startLatch.await(TIMEOUT, TimeUnit.SECONDS), is(true));
  198 + handler.processMsgQueue(ctx);
  199 + finishLatch.countDown();
  200 + } catch (Exception e) {
  201 + log.error("Failed to run processMsgQueue", e);
  202 + fail("Failed to run processMsgQueue");
  203 + }
  204 + }));
  205 +
  206 + //when
  207 + assertThat(readyLatch.await(TIMEOUT, TimeUnit.SECONDS), is(true));
  208 + handler.deviceSessionCtx.setConnected(true);
  209 + startLatch.countDown();
  210 + assertThat(finishLatch.await(TIMEOUT, TimeUnit.SECONDS), is(true));
  211 +
  212 + //then
  213 + assertThat(handler.deviceSessionCtx.getMsgQueueSize().get(), is(0));
  214 + assertThat(handler.deviceSessionCtx.getMsgQueue(), empty());
  215 + verify(handler, times(MSG_QUEUE_LIMIT)).processRegularSessionMsg(any(), any());
  216 + messages.forEach((msg) -> verify(handler, times(1)).processRegularSessionMsg(ctx, msg));
  217 + }
  218 +
  219 +}
\ No newline at end of file
... ...