Commit 86fb9f076a0612dda52715d17944e7bb9af5203b
1 parent
831ddc43
Added test to validate timeseries with failures on downlinks
Showing
2 changed files
with
75 additions
and
7 deletions
... | ... | @@ -173,6 +173,8 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { |
173 | 173 | edgeImitator = new EdgeImitator("localhost", 7070, edge.getRoutingKey(), edge.getSecret()); |
174 | 174 | edgeImitator.expectMessageAmount(9); |
175 | 175 | edgeImitator.connect(); |
176 | + | |
177 | + testReceivedInitialData(); | |
176 | 178 | } |
177 | 179 | |
178 | 180 | @After |
... | ... | @@ -188,9 +190,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { |
188 | 190 | } |
189 | 191 | |
190 | 192 | @Test |
191 | - public void test() throws Exception { | |
192 | - testReceivedInitialData(); | |
193 | - | |
193 | + public void generalTest() throws Exception { | |
194 | 194 | testDevices(); |
195 | 195 | |
196 | 196 | testAssets(); |
... | ... | @@ -218,6 +218,55 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { |
218 | 218 | testRpcCall(); |
219 | 219 | } |
220 | 220 | |
221 | + @Test | |
222 | + public void testTimeseriesWithFailures() throws Exception { | |
223 | + log.info("Testing timeseries with failures"); | |
224 | + | |
225 | + int numberOfTimeseriesToSend = 1000; | |
226 | + | |
227 | + edgeImitator.setRandomFailuresOnTimeseriesDownlink(true); | |
228 | + // imitator will generate failure in 5% of cases | |
229 | + edgeImitator.setFailureProbability(5.0); | |
230 | + | |
231 | + edgeImitator.expectMessageAmount(numberOfTimeseriesToSend); | |
232 | + Device device = findDeviceByName("Edge Device 1"); | |
233 | + for (int idx = 1; idx <= numberOfTimeseriesToSend; idx++) { | |
234 | + String timeseriesData = "{\"data\":{\"idx\":" + idx + "},\"ts\":" + System.currentTimeMillis() + "}"; | |
235 | + JsonNode timeseriesEntityData = mapper.readTree(timeseriesData); | |
236 | + EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.TIMESERIES_UPDATED, | |
237 | + device.getId().getId(), EdgeEventType.DEVICE, timeseriesEntityData); | |
238 | + edgeEventService.saveAsync(edgeEvent); | |
239 | + clusterService.onEdgeEventUpdate(tenantId, edge.getId()); | |
240 | + } | |
241 | + | |
242 | + Assert.assertTrue(edgeImitator.waitForMessages(60)); | |
243 | + | |
244 | + List<EntityDataProto> allTelemetryMsgs = edgeImitator.findAllMessagesByType(EntityDataProto.class); | |
245 | + Assert.assertEquals(numberOfTimeseriesToSend, allTelemetryMsgs.size()); | |
246 | + | |
247 | + for (int idx = 1; idx <= numberOfTimeseriesToSend; idx++) { | |
248 | + Assert.assertTrue(isIdxExistsInTheDownlinkList(idx, allTelemetryMsgs)); | |
249 | + } | |
250 | + | |
251 | + edgeImitator.setRandomFailuresOnTimeseriesDownlink(false); | |
252 | + log.info("Timeseries with failures tested successfully"); | |
253 | + } | |
254 | + | |
255 | + private boolean isIdxExistsInTheDownlinkList(int idx, List<EntityDataProto> allTelemetryMsgs) { | |
256 | + for (EntityDataProto proto : allTelemetryMsgs) { | |
257 | + TransportProtos.PostTelemetryMsg postTelemetryMsg = proto.getPostTelemetryMsg(); | |
258 | + Assert.assertEquals(1, postTelemetryMsg.getTsKvListCount()); | |
259 | + TransportProtos.TsKvListProto tsKvListProto = postTelemetryMsg.getTsKvList(0); | |
260 | + Assert.assertEquals(1, tsKvListProto.getKvCount()); | |
261 | + TransportProtos.KeyValueProto keyValueProto = tsKvListProto.getKv(0); | |
262 | + Assert.assertEquals("idx", keyValueProto.getKey()); | |
263 | + if (keyValueProto.getLongV() == idx) { | |
264 | + return true; | |
265 | + } | |
266 | + } | |
267 | + return false; | |
268 | + } | |
269 | + | |
221 | 270 | private Device findDeviceByName(String deviceName) throws Exception { |
222 | 271 | List<Device> edgeDevices = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/devices?", |
223 | 272 | new TypeReference<PageData<Device>>() { | ... | ... |
... | ... | @@ -21,11 +21,11 @@ import com.google.common.util.concurrent.ListenableFuture; |
21 | 21 | import com.google.common.util.concurrent.MoreExecutors; |
22 | 22 | import com.google.protobuf.AbstractMessage; |
23 | 23 | import lombok.Getter; |
24 | +import lombok.Setter; | |
24 | 25 | import lombok.extern.slf4j.Slf4j; |
25 | 26 | import org.checkerframework.checker.nullness.qual.Nullable; |
26 | 27 | import org.thingsboard.edge.rpc.EdgeGrpcClient; |
27 | 28 | import org.thingsboard.edge.rpc.EdgeRpcClient; |
28 | -import org.thingsboard.server.common.data.id.UserId; | |
29 | 29 | import org.thingsboard.server.gen.edge.AlarmUpdateMsg; |
30 | 30 | import org.thingsboard.server.gen.edge.AssetUpdateMsg; |
31 | 31 | import org.thingsboard.server.gen.edge.CustomerUpdateMsg; |
... | ... | @@ -55,6 +55,7 @@ import java.util.ArrayList; |
55 | 55 | import java.util.List; |
56 | 56 | import java.util.Optional; |
57 | 57 | import java.util.concurrent.CountDownLatch; |
58 | +import java.util.concurrent.ThreadLocalRandom; | |
58 | 59 | import java.util.concurrent.TimeUnit; |
59 | 60 | import java.util.concurrent.locks.Lock; |
60 | 61 | import java.util.concurrent.locks.ReentrantLock; |
... | ... | @@ -74,6 +75,11 @@ public class EdgeImitator { |
74 | 75 | private CountDownLatch responsesLatch; |
75 | 76 | private List<Class<? extends AbstractMessage>> ignoredTypes; |
76 | 77 | |
78 | + @Setter | |
79 | + private boolean randomFailuresOnTimeseriesDownlink = false; | |
80 | + @Setter | |
81 | + private double failureProbability = 0.0; | |
82 | + | |
77 | 83 | @Getter |
78 | 84 | private EdgeConfiguration configuration; |
79 | 85 | @Getter |
... | ... | @@ -208,7 +214,15 @@ public class EdgeImitator { |
208 | 214 | } |
209 | 215 | if (downlinkMsg.getEntityDataCount() > 0) { |
210 | 216 | for (EntityDataProto entityData : downlinkMsg.getEntityDataList()) { |
211 | - result.add(saveDownlinkMsg(entityData)); | |
217 | + if (randomFailuresOnTimeseriesDownlink) { | |
218 | + if (getRandomBoolean()) { | |
219 | + result.add(Futures.immediateFailedFuture(new RuntimeException("Random failure"))); | |
220 | + } else { | |
221 | + result.add(saveDownlinkMsg(entityData)); | |
222 | + } | |
223 | + } else { | |
224 | + result.add(saveDownlinkMsg(entityData)); | |
225 | + } | |
212 | 226 | } |
213 | 227 | } |
214 | 228 | if (downlinkMsg.getEntityViewUpdateMsgCount() > 0) { |
... | ... | @@ -254,6 +268,11 @@ public class EdgeImitator { |
254 | 268 | return Futures.allAsList(result); |
255 | 269 | } |
256 | 270 | |
271 | + private boolean getRandomBoolean() { | |
272 | + double randomValue = ThreadLocalRandom.current().nextDouble() * 100; | |
273 | + return randomValue <= this.failureProbability; | |
274 | + } | |
275 | + | |
257 | 276 | private ListenableFuture<Void> saveDownlinkMsg(AbstractMessage message) { |
258 | 277 | if (!ignoredTypes.contains(message.getClass())) { |
259 | 278 | try { |
... | ... | @@ -271,8 +290,8 @@ public class EdgeImitator { |
271 | 290 | return waitForMessages(5); |
272 | 291 | } |
273 | 292 | |
274 | - public boolean waitForMessages(int timeout) throws InterruptedException { | |
275 | - return messagesLatch.await(timeout, TimeUnit.SECONDS); | |
293 | + public boolean waitForMessages(int timeoutInSeconds) throws InterruptedException { | |
294 | + return messagesLatch.await(timeoutInSeconds, TimeUnit.SECONDS); | |
276 | 295 | } |
277 | 296 | |
278 | 297 | public void expectMessageAmount(int messageAmount) { | ... | ... |