Commit 814fae23d2c3664838a21a2ab26ec4b206c3090c

Authored by 黄 x
1 parent 3852424a

feat: add mqtt process event

@@ -130,6 +130,10 @@ message SessionEventMsg { @@ -130,6 +130,10 @@ message SessionEventMsg {
130 SessionEvent event = 2; 130 SessionEvent event = 2;
131 } 131 }
132 132
  133 +//thingskit
  134 +message PostEventMsg {
  135 + repeated KeyValueProto kv = 1;
  136 +}
133 message PostTelemetryMsg { 137 message PostTelemetryMsg {
134 repeated TsKvListProto tsKvList = 1; 138 repeated TsKvListProto tsKvList = 1;
135 } 139 }
@@ -469,6 +473,8 @@ message TransportToRuleEngineMsg { @@ -469,6 +473,8 @@ message TransportToRuleEngineMsg {
469 PostAttributeMsg postAttributes = 3; 473 PostAttributeMsg postAttributes = 3;
470 ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 4; 474 ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 4;
471 ToServerRpcRequestMsg toServerRPCCallRequest = 5; 475 ToServerRpcRequestMsg toServerRPCCallRequest = 5;
  476 + //thingskit
  477 + PostEventMsg postEvent = 6;
472 } 478 }
473 479
474 /** 480 /**
@@ -51,6 +51,8 @@ public class MqttTopics { @@ -51,6 +51,8 @@ public class MqttTopics {
51 private static final String DEVICE_RPC_REQUEST_SHORT = RPC_SHORT + REQUEST_SHORT + "/"; 51 private static final String DEVICE_RPC_REQUEST_SHORT = RPC_SHORT + REQUEST_SHORT + "/";
52 private static final String DEVICE_ATTRIBUTES_RESPONSE = ATTRIBUTES_RESPONSE + "/"; 52 private static final String DEVICE_ATTRIBUTES_RESPONSE = ATTRIBUTES_RESPONSE + "/";
53 private static final String DEVICE_ATTRIBUTES_REQUEST = ATTRIBUTES_REQUEST + "/"; 53 private static final String DEVICE_ATTRIBUTES_REQUEST = ATTRIBUTES_REQUEST + "/";
  54 + // v1 thingskit event topic
  55 + public static final String BASE_DEVICE_EVENT_TOPIC = "v1/devices/event";
54 // v1 topics 56 // v1 topics
55 public static final String BASE_DEVICE_API_TOPIC = "v1/devices/me"; 57 public static final String BASE_DEVICE_API_TOPIC = "v1/devices/me";
56 public static final String DEVICE_RPC_RESPONSE_TOPIC = BASE_DEVICE_API_TOPIC + DEVICE_RPC_RESPONSE; 58 public static final String DEVICE_RPC_RESPONSE_TOPIC = BASE_DEVICE_API_TOPIC + DEVICE_RPC_RESPONSE;
@@ -86,6 +86,7 @@ public enum ErrorMessage { @@ -86,6 +86,7 @@ public enum ErrorMessage {
86 RULE_CHAIN_NOT_ENABLE(400062,"规则链不是有效的!"), 86 RULE_CHAIN_NOT_ENABLE(400062,"规则链不是有效的!"),
87 DUPLICATE_IDENTIFIERS_EXIST(400063,"存在重复的功能标识符。"), 87 DUPLICATE_IDENTIFIERS_EXIST(400063,"存在重复的功能标识符。"),
88 SMS_CONFIG_ERROR(400064,"短信配置错误。"), 88 SMS_CONFIG_ERROR(400064,"短信配置错误。"),
  89 + INVALID_TOPIC(400065,"无效Topic。"),
89 HAVE_NO_PERMISSION(500002,"没有修改权限"); 90 HAVE_NO_PERMISSION(500002,"没有修改权限");
90 private final int code; 91 private final int code;
91 private String message; 92 private String message;
  1 +package org.thingsboard.server.common.data.yunteng.enums;
  2 +
  3 +public enum TkEventType {
  4 + INFO,ALARM,ERROR
  5 +}
@@ -445,7 +445,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -445,7 +445,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
445 TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX); 445 TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX);
446 transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg)); 446 transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
447 attrReqTopicType = TopicType.V2; 447 attrReqTopicType = TopicType.V2;
448 - } else { 448 + } else if(topicName.startsWith(MqttTopics.BASE_DEVICE_EVENT_TOPIC)){
  449 + // thingskit
  450 + TransportProtos.PostEventMsg postEventMsgMsg = payloadAdaptor.convertToPostEvent(deviceSessionCtx, mqttMsg);
  451 + transportService.process(deviceSessionCtx.getSessionInfo(), postEventMsgMsg, getPubAckCallback(ctx, msgId, postEventMsgMsg),topicName);
  452 + } else{
449 transportService.reportActivity(deviceSessionCtx.getSessionInfo()); 453 transportService.reportActivity(deviceSessionCtx.getSessionInfo());
450 ack(ctx, msgId); 454 ack(ctx, msgId);
451 } 455 }
@@ -54,6 +54,16 @@ public class BackwardCompatibilityAdaptor implements MqttTransportAdaptor { @@ -54,6 +54,16 @@ public class BackwardCompatibilityAdaptor implements MqttTransportAdaptor {
54 return jsonAdaptor.convertToPostAttributes(ctx, inbound); 54 return jsonAdaptor.convertToPostAttributes(ctx, inbound);
55 } 55 }
56 } 56 }
  57 + //thingskit
  58 + @Override
  59 + public TransportProtos.PostEventMsg convertToPostEvent(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
  60 + try {
  61 + return protoAdaptor.convertToPostEvent(ctx, inbound);
  62 + } catch (AdaptorException e) {
  63 + log.trace("[{}] failed to process post event request msg: {} due to: ", ctx.getSessionId(), inbound, e);
  64 + return jsonAdaptor.convertToPostEvent(ctx, inbound);
  65 + }
  66 + }
57 67
58 @Override 68 @Override
59 public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException { 69 public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
@@ -77,6 +77,18 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -77,6 +77,18 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
77 } 77 }
78 } 78 }
79 79
  80 + //thingskit
  81 + @Override
  82 + public TransportProtos.PostEventMsg convertToPostEvent(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
  83 + String payload = validatePayload(ctx.getSessionId(), inbound.payload(), false);
  84 + try {
  85 + return JsonConverter.convertToEventProto(new JsonParser().parse(payload));
  86 + } catch (IllegalStateException | JsonSyntaxException ex) {
  87 + log.debug("Failed to decode post event request", ex);
  88 + throw new AdaptorException(ex);
  89 + }
  90 + }
  91 +
80 @Override 92 @Override
81 public TransportProtos.ClaimDeviceMsg convertToClaimDevice(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { 93 public TransportProtos.ClaimDeviceMsg convertToClaimDevice(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
82 String payload = validatePayload(ctx.getSessionId(), inbound.payload(), true); 94 String payload = validatePayload(ctx.getSessionId(), inbound.payload(), true);
@@ -25,6 +25,7 @@ import io.netty.handler.codec.mqtt.MqttPublishMessage; @@ -25,6 +25,7 @@ import io.netty.handler.codec.mqtt.MqttPublishMessage;
25 import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; 25 import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
26 import org.thingsboard.server.common.data.ota.OtaPackageType; 26 import org.thingsboard.server.common.data.ota.OtaPackageType;
27 import org.thingsboard.server.common.transport.adaptor.AdaptorException; 27 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
  28 +import org.thingsboard.server.gen.transport.TransportProtos;
28 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; 29 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
29 import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; 30 import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
30 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; 31 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
@@ -51,6 +52,8 @@ public interface MqttTransportAdaptor { @@ -51,6 +52,8 @@ public interface MqttTransportAdaptor {
51 PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException; 52 PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
52 53
53 PostAttributeMsg convertToPostAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException; 54 PostAttributeMsg convertToPostAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
  55 + //thingskit
  56 + TransportProtos.PostEventMsg convertToPostEvent(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
54 57
55 GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException; 58 GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException;
56 59
@@ -70,6 +70,20 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor { @@ -70,6 +70,20 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
70 } 70 }
71 } 71 }
72 72
  73 + //thingskit
  74 + @Override
  75 + public TransportProtos.PostEventMsg convertToPostEvent(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
  76 + DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
  77 + byte[] bytes = toBytes(inbound.payload());
  78 + Descriptors.Descriptor attributesDynamicMessageDescriptor = ProtoConverter.validateDescriptor(deviceSessionCtx.getAttributesDynamicMessageDescriptor());
  79 + try {
  80 + return JsonConverter.convertToEventProto(new JsonParser().parse(ProtoConverter.dynamicMsgToJson(bytes, attributesDynamicMessageDescriptor)));
  81 + } catch (Exception e) {
  82 + log.debug("Failed to decode post event request", e);
  83 + throw new AdaptorException(e);
  84 + }
  85 + }
  86 +
73 @Override 87 @Override
74 public TransportProtos.ClaimDeviceMsg convertToClaimDevice(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { 88 public TransportProtos.ClaimDeviceMsg convertToClaimDevice(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
75 byte[] bytes = toBytes(inbound.payload()); 89 byte[] bytes = toBytes(inbound.payload());
@@ -41,6 +41,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.LwM2MRequestMsg; @@ -41,6 +41,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.LwM2MRequestMsg;
41 import org.thingsboard.server.gen.transport.TransportProtos.LwM2MResponseMsg; 41 import org.thingsboard.server.gen.transport.TransportProtos.LwM2MResponseMsg;
42 import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; 42 import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
43 import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; 43 import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
  44 +import org.thingsboard.server.gen.transport.TransportProtos.PostEventMsg;
44 import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg; 45 import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg;
45 import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg; 46 import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg;
46 import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg; 47 import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
@@ -104,6 +105,9 @@ public interface TransportService { @@ -104,6 +105,9 @@ public interface TransportService {
104 105
105 void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback); 106 void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback);
106 107
  108 + //thingskit
  109 + void process(SessionInfoProto sessionInfo, PostEventMsg msg, TransportServiceCallback<Void> callback,String topicName);
  110 +
107 void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback); 111 void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback);
108 112
109 void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback); 113 void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback);
@@ -154,6 +154,18 @@ public class JsonConverter { @@ -154,6 +154,18 @@ public class JsonConverter {
154 } 154 }
155 } 155 }
156 156
  157 + //thingskit
  158 + public static TransportProtos.PostEventMsg convertToEventProto(JsonElement jsonObject) throws JsonSyntaxException {
  159 + if (jsonObject.isJsonObject()) {
  160 + TransportProtos.PostEventMsg.Builder result = TransportProtos.PostEventMsg.newBuilder();
  161 + List<KeyValueProto> keyValueList = parseProtoValues(jsonObject.getAsJsonObject());
  162 + result.addAllKv(keyValueList);
  163 + return result.build();
  164 + } else {
  165 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + jsonObject);
  166 + }
  167 + }
  168 +
157 public static JsonElement toJson(TransportProtos.ToDeviceRpcRequestMsg msg, boolean includeRequestId) { 169 public static JsonElement toJson(TransportProtos.ToDeviceRpcRequestMsg msg, boolean includeRequestId) {
158 //Thingskit function 170 //Thingskit function
159 JsonObject result = null; 171 JsonObject result = null;
@@ -38,6 +38,7 @@ import org.thingsboard.server.common.data.ResourceType; @@ -38,6 +38,7 @@ import org.thingsboard.server.common.data.ResourceType;
38 import org.thingsboard.server.common.data.StringUtils; 38 import org.thingsboard.server.common.data.StringUtils;
39 import org.thingsboard.server.common.data.Tenant; 39 import org.thingsboard.server.common.data.Tenant;
40 import org.thingsboard.server.common.data.device.data.PowerMode; 40 import org.thingsboard.server.common.data.device.data.PowerMode;
  41 +import org.thingsboard.server.common.data.device.profile.MqttTopics;
41 import org.thingsboard.server.common.data.id.CustomerId; 42 import org.thingsboard.server.common.data.id.CustomerId;
42 import org.thingsboard.server.common.data.id.DeviceId; 43 import org.thingsboard.server.common.data.id.DeviceId;
43 import org.thingsboard.server.common.data.id.DeviceProfileId; 44 import org.thingsboard.server.common.data.id.DeviceProfileId;
@@ -46,6 +47,8 @@ import org.thingsboard.server.common.data.id.RuleChainId; @@ -46,6 +47,8 @@ import org.thingsboard.server.common.data.id.RuleChainId;
46 import org.thingsboard.server.common.data.id.TenantId; 47 import org.thingsboard.server.common.data.id.TenantId;
47 import org.thingsboard.server.common.data.id.TenantProfileId; 48 import org.thingsboard.server.common.data.id.TenantProfileId;
48 import org.thingsboard.server.common.data.rpc.RpcStatus; 49 import org.thingsboard.server.common.data.rpc.RpcStatus;
  50 +import org.thingsboard.server.common.data.yunteng.core.exception.TkDataValidationException;
  51 +import org.thingsboard.server.common.data.yunteng.core.message.ErrorMessage;
49 import org.thingsboard.server.common.msg.TbMsg; 52 import org.thingsboard.server.common.msg.TbMsg;
50 import org.thingsboard.server.common.msg.TbMsgMetaData; 53 import org.thingsboard.server.common.msg.TbMsgMetaData;
51 import org.thingsboard.server.common.msg.queue.ServiceQueue; 54 import org.thingsboard.server.common.msg.queue.ServiceQueue;
@@ -569,6 +572,32 @@ public class DefaultTransportService implements TransportService { @@ -569,6 +572,32 @@ public class DefaultTransportService implements TransportService {
569 new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, customerId, msg.getKvList().size(), callback))); 572 new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, customerId, msg.getKvList().size(), callback)));
570 } 573 }
571 } 574 }
  575 + //thingskit
  576 + @Override
  577 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostEventMsg msg,
  578 + TransportServiceCallback<Void> callback,String topicName) {
  579 + //Topic地址:v1/devices/event/{${identifier}}/{$eventType}
  580 + String[] topicInfo = topicName.split(MqttTopics.BASE_DEVICE_EVENT_TOPIC);
  581 + if(null == topicInfo || topicInfo.length !=2){
  582 + throw new TkDataValidationException(ErrorMessage.INVALID_TOPIC.getMessage());
  583 + }
  584 + String[] eventInfo = topicInfo[1].split("/");
  585 + if(null == eventInfo || eventInfo.length !=2){
  586 + throw new TkDataValidationException(ErrorMessage.INVALID_TOPIC.getMessage());
  587 + }
  588 + reportActivityInternal(sessionInfo);
  589 + JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
  590 + TbMsgMetaData metaData = new TbMsgMetaData();
  591 + metaData.putValue("deviceName", sessionInfo.getDeviceName());
  592 + metaData.putValue("deviceType", sessionInfo.getDeviceType());
  593 + metaData.putValue("event_identifier", eventInfo[0]);
  594 + metaData.putValue("event_type",eventInfo[1]);
  595 + CustomerId customerId = getCustomerId(sessionInfo);
  596 + TenantId tenantId = getTenantId(sessionInfo);
  597 + DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
  598 + sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_EVENT_REQUEST,
  599 + new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, customerId, msg.getKvList().size(), callback)));
  600 + }
572 601
573 @Override 602 @Override
574 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) { 603 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {