Commit 859c1f8e9ddd99596b7ceb84ec9d5d75075cd472

Authored by Andrew Shvayka
1 parent f93dc905

TB-66: Implementation

... ... @@ -37,6 +37,7 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException;
37 37 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
38 38 import org.thingsboard.server.dao.EncryptionUtil;
39 39 import org.thingsboard.server.dao.device.DeviceService;
  40 +import org.thingsboard.server.dao.relation.RelationService;
40 41 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
41 42 import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
42 43 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
... ... @@ -67,14 +68,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
67 68 private final SessionMsgProcessor processor;
68 69 private final DeviceService deviceService;
69 70 private final DeviceAuthService authService;
  71 + private final RelationService relationService;
70 72 private final SslHandler sslHandler;
71 73 private volatile boolean connected;
72 74 private volatile GatewaySessionCtx gatewaySessionCtx;
73 75
74   - public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService,
  76 + public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
75 77 MqttTransportAdaptor adaptor, SslHandler sslHandler) {
76 78 this.processor = processor;
77 79 this.deviceService = deviceService;
  80 + this.relationService = relationService;
78 81 this.authService = authService;
79 82 this.adaptor = adaptor;
80 83 this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor);
... ... @@ -371,7 +374,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
371 374 if (infoNode != null) {
372 375 JsonNode gatewayNode = infoNode.get("gateway");
373 376 if (gatewayNode != null && gatewayNode.asBoolean()) {
374   - gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, deviceSessionCtx);
  377 + gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, relationService, deviceSessionCtx);
375 378 }
376 379 }
377 380 }
... ...
... ... @@ -30,6 +30,7 @@ import org.springframework.beans.factory.annotation.Value;
30 30 import org.thingsboard.server.common.transport.SessionMsgProcessor;
31 31 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
32 32 import org.thingsboard.server.dao.device.DeviceService;
  33 +import org.thingsboard.server.dao.relation.RelationService;
33 34 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
34 35
35 36 import javax.net.ssl.SSLException;
... ... @@ -45,14 +46,17 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
45 46 private final SessionMsgProcessor processor;
46 47 private final DeviceService deviceService;
47 48 private final DeviceAuthService authService;
  49 + private final RelationService relationService;
48 50 private final MqttTransportAdaptor adaptor;
49 51 private final MqttSslHandlerProvider sslHandlerProvider;
50 52
51   - public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, MqttTransportAdaptor adaptor,
  53 + public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
  54 + MqttTransportAdaptor adaptor,
52 55 MqttSslHandlerProvider sslHandlerProvider) {
53 56 this.processor = processor;
54 57 this.deviceService = deviceService;
55 58 this.authService = authService;
  59 + this.relationService = relationService;
56 60 this.adaptor = adaptor;
57 61 this.sslHandlerProvider = sslHandlerProvider;
58 62 }
... ... @@ -68,7 +72,7 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
68 72 pipeline.addLast("decoder", new MqttDecoder(MAX_PAYLOAD_SIZE));
69 73 pipeline.addLast("encoder", MqttEncoder.INSTANCE);
70 74
71   - MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, adaptor, sslHandler);
  75 + MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService, adaptor, sslHandler);
72 76 pipeline.addLast(handler);
73 77 ch.closeFuture().addListener(handler);
74 78 }
... ...
... ... @@ -29,6 +29,7 @@ import org.springframework.stereotype.Service;
29 29 import org.thingsboard.server.common.transport.SessionMsgProcessor;
30 30 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
31 31 import org.thingsboard.server.dao.device.DeviceService;
  32 +import org.thingsboard.server.dao.relation.RelationService;
32 33 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
33 34
34 35 import javax.annotation.PostConstruct;
... ... @@ -57,6 +58,9 @@ public class MqttTransportService {
57 58 private DeviceAuthService authService;
58 59
59 60 @Autowired(required = false)
  61 + private RelationService relationService;
  62 +
  63 + @Autowired(required = false)
60 64 private MqttSslHandlerProvider sslHandlerProvider;
61 65
62 66 @Value("${mqtt.bind_address}")
... ... @@ -95,7 +99,7 @@ public class MqttTransportService {
95 99 ServerBootstrap b = new ServerBootstrap();
96 100 b.group(bossGroup, workerGroup)
97 101 .channel(NioServerSocketChannel.class)
98   - .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, adaptor, sslHandlerProvider));
  102 + .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService, adaptor, sslHandlerProvider));
99 103
100 104 serverChannel = b.bind(host, port).sync().channel();
101 105 log.info("Mqtt transport started!");
... ...
... ... @@ -27,6 +27,7 @@ import org.springframework.util.StringUtils;
27 27 import org.thingsboard.server.common.data.Device;
28 28 import org.thingsboard.server.common.data.id.SessionId;
29 29 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
  30 +import org.thingsboard.server.common.data.relation.EntityRelation;
30 31 import org.thingsboard.server.common.msg.core.*;
31 32 import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
32 33 import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
... ... @@ -36,6 +37,7 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException;
36 37 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
37 38 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
38 39 import org.thingsboard.server.dao.device.DeviceService;
  40 +import org.thingsboard.server.dao.relation.RelationService;
39 41 import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
40 42 import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
41 43
... ... @@ -58,28 +60,34 @@ public class GatewaySessionCtx {
58 60 private final SessionMsgProcessor processor;
59 61 private final DeviceService deviceService;
60 62 private final DeviceAuthService authService;
  63 + private final RelationService relationService;
61 64 private final Map<String, GatewayDeviceSessionCtx> devices;
62 65 private ChannelHandlerContext channel;
63 66
64   - public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, DeviceSessionCtx gatewaySessionCtx) {
  67 + public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) {
65 68 this.processor = processor;
66 69 this.deviceService = deviceService;
67 70 this.authService = authService;
  71 + this.relationService = relationService;
68 72 this.gateway = gatewaySessionCtx.getDevice();
69 73 this.gatewaySessionId = gatewaySessionCtx.getSessionId();
70 74 this.devices = new HashMap<>();
71 75 }
72 76
73 77 public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
74   - String deviceName = checkDeviceName(getDeviceName(msg));
  78 + JsonElement json = getJson(msg);
  79 + String deviceName = checkDeviceName(getDeviceName(json));
  80 + String deviceType = getDeviceType(json);
75 81 if (!devices.containsKey(deviceName)) {
76 82 Optional<Device> deviceOpt = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName);
77 83 Device device = deviceOpt.orElseGet(() -> {
78 84 Device newDevice = new Device();
79 85 newDevice.setTenantId(gateway.getTenantId());
80 86 newDevice.setName(deviceName);
81   - newDevice.setType("default");
82   - return deviceService.saveDevice(newDevice);
  87 + newDevice.setType(deviceType);
  88 + newDevice = deviceService.saveDevice(newDevice);
  89 + relationService.saveRelation(new EntityRelation(gateway.getId(), newDevice.getId(), "Created"));
  90 + return newDevice;
83 91 });
84 92 GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device);
85 93 devices.put(deviceName, ctx);
... ... @@ -91,7 +99,7 @@ public class GatewaySessionCtx {
91 99 }
92 100
93 101 public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException {
94   - String deviceName = checkDeviceName(getDeviceName(msg));
  102 + String deviceName = checkDeviceName(getDeviceName(getJson(msg)));
95 103 GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
96 104 if (deviceSessionCtx != null) {
97 105 processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
... ... @@ -211,11 +219,19 @@ public class GatewaySessionCtx {
211 219 }
212 220 }
213 221
214   - private String getDeviceName(MqttPublishMessage mqttMsg) throws AdaptorException {
215   - JsonElement json = JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload());
  222 + private String getDeviceName(JsonElement json) throws AdaptorException {
216 223 return json.getAsJsonObject().get("device").getAsString();
217 224 }
218 225
  226 + private String getDeviceType(JsonElement json) throws AdaptorException {
  227 + JsonElement type = json.getAsJsonObject().get("type");
  228 + return type == null ? "default" : type.getAsString();
  229 + }
  230 +
  231 + private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException {
  232 + return JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload());
  233 + }
  234 +
219 235 protected SessionMsgProcessor getProcessor() {
220 236 return processor;
221 237 }
... ... @@ -229,7 +245,9 @@ public class GatewaySessionCtx {
229 245 }
230 246
231 247 private void ack(MqttPublishMessage msg) {
232   - writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));
  248 + if(msg.variableHeader().messageId() > 0) {
  249 + writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));
  250 + }
233 251 }
234 252
235 253 protected void writeAndFlush(MqttMessage mqttMessage) {
... ...
... ... @@ -18,9 +18,7 @@ package org.thingsboard.server.transport.mqtt.util;
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.util.Base64Utils;
20 20 import org.thingsboard.server.dao.EncryptionUtil;
21   -import sun.misc.BASE64Encoder;
22 21
23   -import java.io.ByteArrayOutputStream;
24 22 import java.io.IOException;
25 23 import java.security.cert.CertificateEncodingException;
26 24 import java.security.cert.X509Certificate;
... ...