Commit 321fb731a6537399a6e4572837967fe41cdaf58a

Authored by ShvaykaD
Committed by GitHub
1 parent 51b0d505

Support of server-side RPC requests via protobuf for CoAP and MQTT

* rpc response schema added

* rpc request proto

* fixed device profile validation

* fix error messages
Showing 32 changed files with 568 additions and 165 deletions
... ... @@ -15,35 +15,11 @@
15 15 */
16 16 package org.thingsboard.server.transport;
17 17
18   -import com.fasterxml.jackson.databind.node.ObjectNode;
19 18 import lombok.extern.slf4j.Slf4j;
20   -import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
21   -import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
22   -import org.eclipse.paho.client.mqttv3.MqttException;
23   -import org.eclipse.paho.client.mqttv3.MqttMessage;
24   -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
25   -import org.junit.Assert;
26   -import org.springframework.util.StringUtils;
27 19 import org.thingsboard.server.common.data.Device;
28 20 import org.thingsboard.server.common.data.DeviceProfile;
29   -import org.thingsboard.server.common.data.DeviceProfileProvisionType;
30   -import org.thingsboard.server.common.data.DeviceProfileType;
31   -import org.thingsboard.server.common.data.DeviceTransportType;
32 21 import org.thingsboard.server.common.data.Tenant;
33   -import org.thingsboard.server.common.data.TransportPayloadType;
34 22 import org.thingsboard.server.common.data.User;
35   -import org.thingsboard.server.common.data.device.profile.AllowCreateNewDevicesDeviceProfileProvisionConfiguration;
36   -import org.thingsboard.server.common.data.device.profile.CheckPreProvisionedDevicesDeviceProfileProvisionConfiguration;
37   -import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileConfiguration;
38   -import org.thingsboard.server.common.data.device.profile.DeviceProfileData;
39   -import org.thingsboard.server.common.data.device.profile.DeviceProfileProvisionConfiguration;
40   -import org.thingsboard.server.common.data.device.profile.DisabledDeviceProfileProvisionConfiguration;
41   -import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadConfiguration;
42   -import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
43   -import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
44   -import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
45   -import org.thingsboard.server.common.data.security.Authority;
46   -import org.thingsboard.server.common.data.security.DeviceCredentials;
47 23 import org.thingsboard.server.controller.AbstractControllerTest;
48 24 import org.thingsboard.server.gen.transport.TransportProtos;
49 25
... ... @@ -51,8 +27,6 @@ import java.util.ArrayList;
51 27 import java.util.List;
52 28 import java.util.concurrent.atomic.AtomicInteger;
53 29
54   -import static org.junit.Assert.assertEquals;
55   -import static org.junit.Assert.assertNotNull;
56 30 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
57 31
58 32 @Slf4j
... ... @@ -105,6 +79,22 @@ public abstract class AbstractTransportIntegrationTest extends AbstractControlle
105 79 " }\n" +
106 80 "}";
107 81
  82 + protected static final String DEVICE_RPC_RESPONSE_PROTO_SCHEMA = "syntax =\"proto3\";\n" +
  83 + "package rpc;\n" +
  84 + "\n" +
  85 + "message RpcResponseMsg {\n" +
  86 + " string payload = 1;\n" +
  87 + "}";
  88 +
  89 + protected static final String DEVICE_RPC_REQUEST_PROTO_SCHEMA = "syntax =\"proto3\";\n" +
  90 + "package rpc;\n" +
  91 + "\n" +
  92 + "message RpcRequestMsg {\n" +
  93 + " string method = 1;\n" +
  94 + " int32 requestId = 2;\n" +
  95 + " string params = 3;\n" +
  96 + "}";
  97 +
108 98 protected Tenant savedTenant;
109 99 protected User tenantAdmin;
110 100
... ...
... ... @@ -53,7 +53,7 @@ import static org.junit.Assert.assertNotNull;
53 53 public abstract class AbstractCoapIntegrationTest extends AbstractTransportIntegrationTest {
54 54
55 55 protected void processBeforeTest(String deviceName, CoapDeviceType coapDeviceType, TransportPayloadType payloadType) throws Exception {
56   - this.processBeforeTest(deviceName, coapDeviceType, payloadType, null, null, DeviceProfileProvisionType.DISABLED, null, null);
  56 + this.processBeforeTest(deviceName, coapDeviceType, payloadType, null, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
57 57 }
58 58
59 59 protected void processBeforeTest(String deviceName,
... ... @@ -61,8 +61,11 @@ public abstract class AbstractCoapIntegrationTest extends AbstractTransportInteg
61 61 TransportPayloadType payloadType,
62 62 String telemetryProtoSchema,
63 63 String attributesProtoSchema,
64   - DeviceProfileProvisionType provisionType,
65   - String provisionKey, String provisionSecret
  64 + String rpcResponseProtoSchema,
  65 + String rpcRequestProtoSchema,
  66 + String provisionKey,
  67 + String provisionSecret,
  68 + DeviceProfileProvisionType provisionType
66 69 ) throws Exception {
67 70 loginSysAdmin();
68 71
... ... @@ -85,7 +88,7 @@ public abstract class AbstractCoapIntegrationTest extends AbstractTransportInteg
85 88 device.setType("default");
86 89
87 90 if (coapDeviceType != null) {
88   - DeviceProfile coapDeviceProfile = createCoapDeviceProfile(payloadType, coapDeviceType, attributesProtoSchema, provisionType, provisionKey, provisionSecret, telemetryProtoSchema);
  91 + DeviceProfile coapDeviceProfile = createCoapDeviceProfile(payloadType, coapDeviceType, provisionSecret, provisionType, provisionKey, attributesProtoSchema, telemetryProtoSchema, rpcResponseProtoSchema, rpcRequestProtoSchema);
89 92 deviceProfile = doPost("/api/deviceProfile", coapDeviceProfile, DeviceProfile.class);
90 93 device.setType(deviceProfile.getName());
91 94 device.setDeviceProfileId(deviceProfile.getId());
... ... @@ -103,8 +106,9 @@ public abstract class AbstractCoapIntegrationTest extends AbstractTransportInteg
103 106 }
104 107
105 108 protected DeviceProfile createCoapDeviceProfile(TransportPayloadType transportPayloadType, CoapDeviceType coapDeviceType,
106   - String attributesProtoSchema, DeviceProfileProvisionType provisionType,
107   - String provisionKey, String provisionSecret, String telemetryProtoSchema) {
  109 + String provisionSecret, DeviceProfileProvisionType provisionType,
  110 + String provisionKey, String attributesProtoSchema,
  111 + String telemetryProtoSchema, String rpcResponseProtoSchema, String rpcRequestProtoSchema) {
108 112 DeviceProfile deviceProfile = new DeviceProfile();
109 113 deviceProfile.setName(transportPayloadType.name());
110 114 deviceProfile.setType(DeviceProfileType.DEFAULT);
... ... @@ -127,8 +131,16 @@ public abstract class AbstractCoapIntegrationTest extends AbstractTransportInteg
127 131 if (StringUtils.isEmpty(attributesProtoSchema)) {
128 132 attributesProtoSchema = DEVICE_ATTRIBUTES_PROTO_SCHEMA;
129 133 }
  134 + if (StringUtils.isEmpty(rpcResponseProtoSchema)) {
  135 + rpcResponseProtoSchema = DEVICE_RPC_RESPONSE_PROTO_SCHEMA;
  136 + }
  137 + if (StringUtils.isEmpty(rpcRequestProtoSchema)) {
  138 + rpcRequestProtoSchema = DEVICE_RPC_REQUEST_PROTO_SCHEMA;
  139 + }
130 140 protoTransportPayloadConfiguration.setDeviceTelemetryProtoSchema(telemetryProtoSchema);
131 141 protoTransportPayloadConfiguration.setDeviceAttributesProtoSchema(attributesProtoSchema);
  142 + protoTransportPayloadConfiguration.setDeviceRpcResponseProtoSchema(rpcResponseProtoSchema);
  143 + protoTransportPayloadConfiguration.setDeviceRpcRequestProtoSchema(rpcRequestProtoSchema);
132 144 transportPayloadTypeConfiguration = protoTransportPayloadConfiguration;
133 145 } else {
134 146 transportPayloadTypeConfiguration = new JsonTransportPayloadConfiguration();
... ...
... ... @@ -79,7 +79,7 @@ public abstract class AbstractCoapAttributesRequestProtoIntegrationTest extends
79 79 @Test
80 80 public void testRequestAttributesValuesFromTheServer() throws Exception {
81 81 super.processBeforeTest("Test Request attribute values from the server proto", CoapDeviceType.DEFAULT,
82   - TransportPayloadType.PROTOBUF, null, ATTRIBUTES_SCHEMA_STR, DeviceProfileProvisionType.DISABLED, null, null);
  82 + TransportPayloadType.PROTOBUF, null, ATTRIBUTES_SCHEMA_STR, null, null, null, null, DeviceProfileProvisionType.DISABLED);
83 83 processTestRequestAttributesValuesFromTheServer();
84 84 }
85 85
... ...
... ... @@ -88,7 +88,7 @@ public abstract class AbstractCoapProvisionJsonDeviceTest extends AbstractCoapIn
88 88
89 89
90 90 private void processTestProvisioningDisabledDevice() throws Exception {
91   - super.processBeforeTest("Test Provision device", CoapDeviceType.DEFAULT, TransportPayloadType.JSON, null, null, DeviceProfileProvisionType.DISABLED, null, null);
  91 + super.processBeforeTest("Test Provision device", CoapDeviceType.DEFAULT, TransportPayloadType.JSON, null, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
92 92 byte[] result = createCoapClientAndPublish().getPayload();
93 93 JsonObject response = JsonUtils.parse(new String(result)).getAsJsonObject();
94 94 Assert.assertEquals("Provision data was not found!", response.get("errorMsg").getAsString());
... ... @@ -97,7 +97,7 @@ public abstract class AbstractCoapProvisionJsonDeviceTest extends AbstractCoapIn
97 97
98 98
99 99 private void processTestProvisioningCreateNewDeviceWithoutCredentials() throws Exception {
100   - super.processBeforeTest("Test Provision device3", CoapDeviceType.DEFAULT, TransportPayloadType.JSON, null, null, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES, "testProvisionKey", "testProvisionSecret");
  100 + super.processBeforeTest("Test Provision device3", CoapDeviceType.DEFAULT, TransportPayloadType.JSON, null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
101 101 byte[] result = createCoapClientAndPublish().getPayload();
102 102 JsonObject response = JsonUtils.parse(new String(result)).getAsJsonObject();
103 103
... ... @@ -113,7 +113,7 @@ public abstract class AbstractCoapProvisionJsonDeviceTest extends AbstractCoapIn
113 113
114 114
115 115 private void processTestProvisioningCreateNewDeviceWithAccessToken() throws Exception {
116   - super.processBeforeTest("Test Provision device3", CoapDeviceType.DEFAULT, TransportPayloadType.JSON, null, null, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES, "testProvisionKey", "testProvisionSecret");
  116 + super.processBeforeTest("Test Provision device3", CoapDeviceType.DEFAULT, TransportPayloadType.JSON, null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
117 117 String requestCredentials = ",\"credentialsType\": \"ACCESS_TOKEN\",\"token\": \"test_token\"";
118 118 byte[] result = createCoapClientAndPublish(requestCredentials).getPayload();
119 119 JsonObject response = JsonUtils.parse(new String(result)).getAsJsonObject();
... ... @@ -132,7 +132,7 @@ public abstract class AbstractCoapProvisionJsonDeviceTest extends AbstractCoapIn
132 132
133 133
134 134 private void processTestProvisioningCreateNewDeviceWithCert() throws Exception {
135   - super.processBeforeTest("Test Provision device3", CoapDeviceType.DEFAULT, TransportPayloadType.JSON, null, null, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES, "testProvisionKey", "testProvisionSecret");
  135 + super.processBeforeTest("Test Provision device3", CoapDeviceType.DEFAULT, TransportPayloadType.JSON, null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
136 136 String requestCredentials = ",\"credentialsType\": \"X509_CERTIFICATE\",\"hash\": \"testHash\"";
137 137 byte[] result = createCoapClientAndPublish(requestCredentials).getPayload();
138 138 JsonObject response = JsonUtils.parse(new String(result)).getAsJsonObject();
... ... @@ -156,7 +156,7 @@ public abstract class AbstractCoapProvisionJsonDeviceTest extends AbstractCoapIn
156 156 }
157 157
158 158 private void processTestProvisioningCheckPreProvisionedDevice() throws Exception {
159   - super.processBeforeTest("Test Provision device", CoapDeviceType.DEFAULT, TransportPayloadType.JSON, null, null, DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES, "testProvisionKey", "testProvisionSecret");
  159 + super.processBeforeTest("Test Provision device", CoapDeviceType.DEFAULT, TransportPayloadType.JSON, null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES);
160 160 byte[] result = createCoapClientAndPublish().getPayload();
161 161 JsonObject response = JsonUtils.parse(new String(result)).getAsJsonObject();
162 162
... ... @@ -167,7 +167,7 @@ public abstract class AbstractCoapProvisionJsonDeviceTest extends AbstractCoapIn
167 167 }
168 168
169 169 private void processTestProvisioningWithBadKeyDevice() throws Exception {
170   - super.processBeforeTest("Test Provision device", CoapDeviceType.DEFAULT, TransportPayloadType.JSON, null, null, DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES, "testProvisionKeyOrig", "testProvisionSecret");
  170 + super.processBeforeTest("Test Provision device", CoapDeviceType.DEFAULT, TransportPayloadType.JSON, null, null, null, null, "testProvisionKeyOrig", "testProvisionSecret", DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES);
171 171 byte[] result = createCoapClientAndPublish().getPayload();
172 172 JsonObject response = JsonUtils.parse(new String(result)).getAsJsonObject();
173 173 Assert.assertEquals("Provision data was not found!", response.get("errorMsg").getAsString());
... ...
... ... @@ -92,14 +92,14 @@ public abstract class AbstractCoapProvisionProtoDeviceTest extends AbstractCoapI
92 92
93 93
94 94 private void processTestProvisioningDisabledDevice() throws Exception {
95   - super.processBeforeTest("Test Provision device", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF, null, null, DeviceProfileProvisionType.DISABLED, null, null);
  95 + super.processBeforeTest("Test Provision device", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF, null, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
96 96 ProvisionDeviceResponseMsg result = ProvisionDeviceResponseMsg.parseFrom(createCoapClientAndPublish().getPayload());
97 97 Assert.assertNotNull(result);
98 98 Assert.assertEquals(ProvisionResponseStatus.NOT_FOUND.name(), result.getStatus().toString());
99 99 }
100 100
101 101 private void processTestProvisioningCreateNewDeviceWithoutCredentials() throws Exception {
102   - super.processBeforeTest("Test Provision device3", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF, null, null, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES, "testProvisionKey", "testProvisionSecret");
  102 + super.processBeforeTest("Test Provision device3", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF, null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
103 103 ProvisionDeviceResponseMsg response = ProvisionDeviceResponseMsg.parseFrom(createCoapClientAndPublish().getPayload());
104 104
105 105 Device createdDevice = deviceService.findDeviceByTenantIdAndName(savedTenant.getTenantId(), "Test Provision device");
... ... @@ -113,7 +113,7 @@ public abstract class AbstractCoapProvisionProtoDeviceTest extends AbstractCoapI
113 113 }
114 114
115 115 private void processTestProvisioningCreateNewDeviceWithAccessToken() throws Exception {
116   - super.processBeforeTest("Test Provision device3", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF, null, null, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES, "testProvisionKey", "testProvisionSecret");
  116 + super.processBeforeTest("Test Provision device3", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF, null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
117 117 CredentialsDataProto requestCredentials = CredentialsDataProto.newBuilder().setValidateDeviceTokenRequestMsg(ValidateDeviceTokenRequestMsg.newBuilder().setToken("test_token").build()).build();
118 118
119 119 ProvisionDeviceResponseMsg response = ProvisionDeviceResponseMsg.parseFrom(createCoapClientAndPublish(createTestsProvisionMessage(CredentialsType.ACCESS_TOKEN, requestCredentials)).getPayload());
... ... @@ -131,7 +131,7 @@ public abstract class AbstractCoapProvisionProtoDeviceTest extends AbstractCoapI
131 131 }
132 132
133 133 private void processTestProvisioningCreateNewDeviceWithCert() throws Exception {
134   - super.processBeforeTest("Test Provision device3", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF, null, null, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES, "testProvisionKey", "testProvisionSecret");
  134 + super.processBeforeTest("Test Provision device3", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF, null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
135 135 CredentialsDataProto requestCredentials = CredentialsDataProto.newBuilder().setValidateDeviceX509CertRequestMsg(ValidateDeviceX509CertRequestMsg.newBuilder().setHash("testHash").build()).build();
136 136
137 137 ProvisionDeviceResponseMsg response = ProvisionDeviceResponseMsg.parseFrom(createCoapClientAndPublish(createTestsProvisionMessage(CredentialsType.X509_CERTIFICATE, requestCredentials)).getPayload());
... ... @@ -155,7 +155,7 @@ public abstract class AbstractCoapProvisionProtoDeviceTest extends AbstractCoapI
155 155 }
156 156
157 157 private void processTestProvisioningCheckPreProvisionedDevice() throws Exception {
158   - super.processBeforeTest("Test Provision device", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF, null, null, DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES, "testProvisionKey", "testProvisionSecret");
  158 + super.processBeforeTest("Test Provision device", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF, null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES);
159 159 ProvisionDeviceResponseMsg response = ProvisionDeviceResponseMsg.parseFrom(createCoapClientAndPublish().getPayload());
160 160
161 161 DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(savedTenant.getTenantId(), savedDevice.getId());
... ... @@ -165,7 +165,7 @@ public abstract class AbstractCoapProvisionProtoDeviceTest extends AbstractCoapI
165 165 }
166 166
167 167 private void processTestProvisioningWithBadKeyDevice() throws Exception {
168   - super.processBeforeTest("Test Provision device", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF, null, null, DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES, "testProvisionKeyOrig", "testProvisionSecret");
  168 + super.processBeforeTest("Test Provision device", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF, null, null, null, null, "testProvisionKeyOrig", "testProvisionSecret", DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES);
169 169 ProvisionDeviceResponseMsg response = ProvisionDeviceResponseMsg.parseFrom(createCoapClientAndPublish().getPayload());
170 170 Assert.assertEquals(ProvisionResponseStatus.NOT_FOUND.name(), response.getStatus().toString());
171 171 }
... ...
... ... @@ -124,7 +124,7 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC
124 124 return COAP_BASE_URL + token + "/" + FeatureType.RPC.name().toLowerCase() + "/" + requestId;
125 125 }
126 126
127   - private class TestCoapCallback implements CoapHandler {
  127 + protected class TestCoapCallback implements CoapHandler {
128 128
129 129 private final CoapClient client;
130 130 private final CountDownLatch latch;
... ... @@ -136,7 +136,7 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC
136 136
137 137 private Integer observe;
138 138
139   - private TestCoapCallback(CoapClient client, CountDownLatch latch, boolean isOneWayRpc) {
  139 + TestCoapCallback(CoapClient client, CountDownLatch latch, boolean isOneWayRpc) {
140 140 this.client = client;
141 141 this.latch = latch;
142 142 this.isOneWayRpc = isOneWayRpc;
... ... @@ -144,7 +144,7 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC
144 144
145 145 @Override
146 146 public void onLoad(CoapResponse response) {
147   - log.warn("coap response: {}, {}", response, response.getCode());
  147 + log.warn("coap response: {}, {}", response.getResponseText(), response.getCode());
148 148 assertNotNull(response.getPayload());
149 149 assertEquals(response.getCode(), CoAP.ResponseCode.CONTENT);
150 150 observe = response.getOptions().getObserve();
... ...
... ... @@ -36,12 +36,12 @@ public abstract class AbstractCoapServerSideRpcJsonIntegrationTest extends Abstr
36 36 }
37 37
38 38 @Test
39   - public void testServerMqttOneWayRpc() throws Exception {
  39 + public void testServerCoapOneWayRpc() throws Exception {
40 40 processOneWayRpcTest();
41 41 }
42 42
43 43 @Test
44   - public void testServerMqttTwoWayRpc() throws Exception {
  44 + public void testServerCoapTwoWayRpc() throws Exception {
45 45 processTwoWayRpcTest();
46 46 }
47 47
... ...
... ... @@ -15,26 +15,62 @@
15 15 */
16 16 package org.thingsboard.server.transport.coap.rpc;
17 17
  18 +import com.github.os72.protobuf.dynamic.DynamicSchema;
  19 +import com.google.protobuf.Descriptors;
  20 +import com.google.protobuf.DynamicMessage;
  21 +import com.google.protobuf.InvalidProtocolBufferException;
  22 +import com.squareup.wire.schema.internal.parser.ProtoFileElement;
18 23 import lombok.extern.slf4j.Slf4j;
19 24 import org.eclipse.californium.core.CoapClient;
20 25 import org.eclipse.californium.core.CoapHandler;
  26 +import org.eclipse.californium.core.CoapObserveRelation;
21 27 import org.eclipse.californium.core.CoapResponse;
  28 +import org.eclipse.californium.core.coap.CoAP;
22 29 import org.eclipse.californium.core.coap.MediaTypeRegistry;
  30 +import org.eclipse.californium.core.coap.Request;
23 31 import org.junit.After;
24 32 import org.junit.Before;
25 33 import org.junit.Test;
26 34 import org.thingsboard.server.common.data.CoapDeviceType;
  35 +import org.thingsboard.server.common.data.DeviceProfileProvisionType;
27 36 import org.thingsboard.server.common.data.TransportPayloadType;
28   -import org.thingsboard.server.gen.transport.TransportProtos;
  37 +import org.thingsboard.server.common.data.device.profile.CoapDeviceProfileTransportConfiguration;
  38 +import org.thingsboard.server.common.data.device.profile.CoapDeviceTypeConfiguration;
  39 +import org.thingsboard.server.common.data.device.profile.DefaultCoapDeviceTypeConfiguration;
  40 +import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
  41 +import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
  42 +import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
  43 +import org.thingsboard.server.common.msg.session.FeatureType;
29 44
  45 +import java.util.List;
30 46 import java.util.concurrent.CountDownLatch;
  47 +import java.util.concurrent.TimeUnit;
  48 +
  49 +import static org.junit.Assert.assertEquals;
  50 +import static org.junit.Assert.assertNotNull;
  51 +import static org.junit.Assert.assertTrue;
  52 +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
31 53
32 54 @Slf4j
33 55 public abstract class AbstractCoapServerSideRpcProtoIntegrationTest extends AbstractCoapServerSideRpcIntegrationTest {
34 56
  57 + private static final String RPC_REQUEST_PROTO_SCHEMA = "syntax =\"proto3\";\n" +
  58 + "package rpc;\n" +
  59 + "\n" +
  60 + "message RpcRequestMsg {\n" +
  61 + " string method = 1;\n" +
  62 + " int32 requestId = 2;\n" +
  63 + " Params params = 3;\n" +
  64 + "\n" +
  65 + " message Params {\n" +
  66 + " string pin = 1;\n" +
  67 + " int32 value = 2;\n" +
  68 + " }\n" +
  69 + "}";
  70 +
35 71 @Before
36 72 public void beforeTest() throws Exception {
37   - processBeforeTest("RPC test device", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF);
  73 + processBeforeTest("RPC test device", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF, null, null, null, RPC_REQUEST_PROTO_SCHEMA, null, null, DeviceProfileProvisionType.DISABLED);
38 74 }
39 75
40 76 @After
... ... @@ -43,33 +79,91 @@ public abstract class AbstractCoapServerSideRpcProtoIntegrationTest extends Abst
43 79 }
44 80
45 81 @Test
46   - public void testServerMqttOneWayRpc() throws Exception {
  82 + public void testServerCoapOneWayRpc() throws Exception {
47 83 processOneWayRpcTest();
48 84 }
49 85
50 86 @Test
51   - public void testServerMqttTwoWayRpc() throws Exception {
  87 + public void testServerCoapTwoWayRpc() throws Exception {
52 88 processTwoWayRpcTest();
53 89 }
54 90
  91 + protected void processTwoWayRpcTest() throws Exception {
  92 + CoapClient client = getCoapClient(FeatureType.RPC);
  93 + client.useCONs();
  94 +
  95 + CountDownLatch latch = new CountDownLatch(1);
  96 + TestCoapCallback testCoapCallback = new TestCoapCallback(client, latch, false);
  97 +
  98 + Request request = Request.newGet().setObserve();
  99 + request.setType(CoAP.Type.CON);
  100 + CoapObserveRelation observeRelation = client.observe(request, testCoapCallback);
  101 +
  102 + String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}";
  103 + String deviceId = savedDevice.getId().getId().toString();
  104 +
  105 + String expected = "{\"payload\":\"{\\\"value1\\\":\\\"A\\\",\\\"value2\\\":\\\"B\\\"}\"}";
  106 +
  107 + String result = doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk());
  108 + latch.await(3, TimeUnit.SECONDS);
  109 +
  110 + assertEquals(expected, result);
  111 + assertEquals(0, testCoapCallback.getObserve().intValue());
  112 + observeRelation.proactiveCancel();
  113 + assertTrue(observeRelation.isCanceled());
  114 + }
  115 +
55 116 @Override
56 117 protected void processOnLoadResponse(CoapResponse response, CoapClient client, Integer observe, CountDownLatch latch) {
57 118 client.setURI(getRpcResponseFeatureTokenUrl(accessToken, observe));
58   - TransportProtos.ToDeviceRpcResponseMsg toDeviceRpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
59   - .setPayload(DEVICE_RESPONSE)
60   - .setRequestId(observe)
61   - .build();
62   - client.post(new CoapHandler() {
63   - @Override
64   - public void onLoad(CoapResponse response) {
65   - log.warn("Command Response Ack: {}, {}", response.getCode(), response.getResponseText());
66   - latch.countDown();
67   - }
  119 + ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = getProtoTransportPayloadConfiguration();
  120 + ProtoFileElement rpcRequestProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(RPC_REQUEST_PROTO_SCHEMA);
  121 + DynamicSchema rpcRequestProtoSchema = protoTransportPayloadConfiguration.getDynamicSchema(rpcRequestProtoSchemaFile, ProtoTransportPayloadConfiguration.RPC_REQUEST_PROTO_SCHEMA);
68 122
69   - @Override
70   - public void onError() {
71   - log.warn("Command Response Ack Error, No connect");
  123 + byte[] requestPayload = response.getPayload();
  124 + DynamicMessage.Builder rpcRequestMsg = rpcRequestProtoSchema.newMessageBuilder("RpcRequestMsg");
  125 + Descriptors.Descriptor rpcRequestMsgDescriptor = rpcRequestMsg.getDescriptorForType();
  126 + assertNotNull(rpcRequestMsgDescriptor);
  127 + try {
  128 + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(rpcRequestMsgDescriptor, requestPayload);
  129 + List<Descriptors.FieldDescriptor> fields = rpcRequestMsgDescriptor.getFields();
  130 + for (Descriptors.FieldDescriptor fieldDescriptor: fields) {
  131 + assertTrue(dynamicMessage.hasField(fieldDescriptor));
72 132 }
73   - }, toDeviceRpcResponseMsg.toByteArray(), MediaTypeRegistry.APPLICATION_JSON);
  133 + ProtoFileElement rpcResponseProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_RPC_RESPONSE_PROTO_SCHEMA);
  134 + DynamicSchema rpcResponseProtoSchema = protoTransportPayloadConfiguration.getDynamicSchema(rpcResponseProtoSchemaFile, ProtoTransportPayloadConfiguration.RPC_RESPONSE_PROTO_SCHEMA);
  135 + DynamicMessage.Builder rpcResponseBuilder = rpcResponseProtoSchema.newMessageBuilder("RpcResponseMsg");
  136 + Descriptors.Descriptor rpcResponseMsgDescriptor = rpcResponseBuilder.getDescriptorForType();
  137 + assertNotNull(rpcResponseMsgDescriptor);
  138 + DynamicMessage rpcResponseMsg = rpcResponseBuilder
  139 + .setField(rpcResponseMsgDescriptor.findFieldByName("payload"), DEVICE_RESPONSE)
  140 + .build();
  141 + client.post(new CoapHandler() {
  142 + @Override
  143 + public void onLoad(CoapResponse response) {
  144 + log.warn("Command Response Ack: {}, {}", response.getCode(), response.getResponseText());
  145 + latch.countDown();
  146 + }
  147 +
  148 + @Override
  149 + public void onError() {
  150 + log.warn("Command Response Ack Error, No connect");
  151 + }
  152 + }, rpcResponseMsg.toByteArray(), MediaTypeRegistry.APPLICATION_JSON);
  153 + } catch (InvalidProtocolBufferException e) {
  154 + log.warn("Command Response Ack Error, Invalid response received: ", e);
  155 + }
  156 + }
  157 +
  158 + private ProtoTransportPayloadConfiguration getProtoTransportPayloadConfiguration() {
  159 + DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
  160 + assertTrue(transportConfiguration instanceof CoapDeviceProfileTransportConfiguration);
  161 + CoapDeviceProfileTransportConfiguration coapDeviceProfileTransportConfiguration = (CoapDeviceProfileTransportConfiguration) transportConfiguration;
  162 + CoapDeviceTypeConfiguration coapDeviceTypeConfiguration = coapDeviceProfileTransportConfiguration.getCoapDeviceTypeConfiguration();
  163 + assertTrue(coapDeviceTypeConfiguration instanceof DefaultCoapDeviceTypeConfiguration);
  164 + DefaultCoapDeviceTypeConfiguration defaultCoapDeviceTypeConfiguration = (DefaultCoapDeviceTypeConfiguration) coapDeviceTypeConfiguration;
  165 + TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = defaultCoapDeviceTypeConfiguration.getTransportPayloadTypeConfiguration();
  166 + assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
  167 + return (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
74 168 }
75 169 }
... ...
... ... @@ -114,7 +114,7 @@ public abstract class AbstractCoapTimeseriesProtoIntegrationTest extends Abstrac
114 114 " }\n" +
115 115 " }\n" +
116 116 "}";
117   - super.processBeforeTest("Test Post Telemetry device proto payload", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF, schemaStr, null, DeviceProfileProvisionType.DISABLED, null, null);
  117 + super.processBeforeTest("Test Post Telemetry device proto payload", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF, schemaStr, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
118 118 DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
119 119 assertTrue(transportConfiguration instanceof CoapDeviceProfileTransportConfiguration);
120 120 CoapDeviceProfileTransportConfiguration coapDeviceProfileTransportConfiguration = (CoapDeviceProfileTransportConfiguration) transportConfiguration;
... ...
... ... @@ -62,7 +62,7 @@ public abstract class AbstractMqttIntegrationTest extends AbstractTransportInteg
62 62 protected DeviceProfile deviceProfile;
63 63
64 64 protected void processBeforeTest (String deviceName, String gatewayName, TransportPayloadType payloadType, String telemetryTopic, String attributesTopic) throws Exception {
65   - this.processBeforeTest(deviceName, gatewayName, payloadType, telemetryTopic, attributesTopic, null, null, DeviceProfileProvisionType.DISABLED, null, null);
  65 + this.processBeforeTest(deviceName, gatewayName, payloadType, telemetryTopic, attributesTopic, null, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
66 66 }
67 67
68 68 protected void processBeforeTest(String deviceName,
... ... @@ -72,9 +72,12 @@ public abstract class AbstractMqttIntegrationTest extends AbstractTransportInteg
72 72 String attributesTopic,
73 73 String telemetryProtoSchema,
74 74 String attributesProtoSchema,
75   - DeviceProfileProvisionType provisionType,
76   - String provisionKey, String provisionSecret
77   - ) throws Exception {
  75 + String rpcResponseProtoSchema,
  76 + String rpcRequestProtoSchema,
  77 + String provisionKey,
  78 + String provisionSecret,
  79 + DeviceProfileProvisionType provisionType
  80 + ) throws Exception {
78 81 loginSysAdmin();
79 82
80 83 Tenant tenant = new Tenant();
... ... @@ -103,7 +106,7 @@ public abstract class AbstractMqttIntegrationTest extends AbstractTransportInteg
103 106 gateway.setAdditionalInfo(additionalInfo);
104 107
105 108 if (payloadType != null) {
106   - DeviceProfile mqttDeviceProfile = createMqttDeviceProfile(payloadType, telemetryTopic, attributesTopic, telemetryProtoSchema, attributesProtoSchema, provisionType, provisionKey, provisionSecret);
  109 + DeviceProfile mqttDeviceProfile = createMqttDeviceProfile(payloadType, telemetryTopic, attributesTopic, telemetryProtoSchema, attributesProtoSchema, rpcResponseProtoSchema, rpcRequestProtoSchema, provisionKey, provisionSecret, provisionType);
107 110 deviceProfile = doPost("/api/deviceProfile", mqttDeviceProfile, DeviceProfile.class);
108 111 device.setType(deviceProfile.getName());
109 112 device.setDeviceProfileId(deviceProfile.getId());
... ... @@ -157,8 +160,9 @@ public abstract class AbstractMqttIntegrationTest extends AbstractTransportInteg
157 160 protected DeviceProfile createMqttDeviceProfile(TransportPayloadType transportPayloadType,
158 161 String telemetryTopic, String attributesTopic,
159 162 String telemetryProtoSchema, String attributesProtoSchema,
160   - DeviceProfileProvisionType provisionType,
161   - String provisionKey, String provisionSecret) {
  163 + String rpcResponseProtoSchema, String rpcRequestProtoSchema,
  164 + String provisionKey, String provisionSecret,
  165 + DeviceProfileProvisionType provisionType) {
162 166 DeviceProfile deviceProfile = new DeviceProfile();
163 167 deviceProfile.setName(transportPayloadType.name());
164 168 deviceProfile.setType(DeviceProfileType.DEFAULT);
... ... @@ -186,8 +190,16 @@ public abstract class AbstractMqttIntegrationTest extends AbstractTransportInteg
186 190 if (StringUtils.isEmpty(attributesProtoSchema)) {
187 191 attributesProtoSchema = DEVICE_ATTRIBUTES_PROTO_SCHEMA;
188 192 }
  193 + if (StringUtils.isEmpty(rpcResponseProtoSchema)) {
  194 + rpcResponseProtoSchema = DEVICE_RPC_RESPONSE_PROTO_SCHEMA;
  195 + }
  196 + if (StringUtils.isEmpty(rpcRequestProtoSchema)) {
  197 + rpcRequestProtoSchema = DEVICE_RPC_REQUEST_PROTO_SCHEMA;
  198 + }
189 199 protoTransportPayloadConfiguration.setDeviceTelemetryProtoSchema(telemetryProtoSchema);
190 200 protoTransportPayloadConfiguration.setDeviceAttributesProtoSchema(attributesProtoSchema);
  201 + protoTransportPayloadConfiguration.setDeviceRpcResponseProtoSchema(rpcResponseProtoSchema);
  202 + protoTransportPayloadConfiguration.setDeviceRpcRequestProtoSchema(rpcRequestProtoSchema);
191 203 transportPayloadTypeConfiguration = protoTransportPayloadConfiguration;
192 204 }
193 205 mqttDeviceProfileTransportConfiguration.setTransportPayloadTypeConfiguration(transportPayloadTypeConfiguration);
... ...
... ... @@ -83,7 +83,7 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends
83 83 @Test
84 84 public void testRequestAttributesValuesFromTheServer() throws Exception {
85 85 super.processBeforeTest("Test Request attribute values from the server proto", "Gateway Test Request attribute values from the server proto",
86   - TransportPayloadType.PROTOBUF, null, null, null, ATTRIBUTES_SCHEMA_STR, DeviceProfileProvisionType.DISABLED, null, null);
  86 + TransportPayloadType.PROTOBUF, null, null, null, ATTRIBUTES_SCHEMA_STR, null, null, null, null, DeviceProfileProvisionType.DISABLED);
87 87 processTestRequestAttributesValuesFromTheServer();
88 88 }
89 89
... ...
... ... @@ -94,7 +94,7 @@ public abstract class AbstractMqttProvisionJsonDeviceTest extends AbstractMqttIn
94 94
95 95
96 96 protected void processTestProvisioningDisabledDevice() throws Exception {
97   - super.processBeforeTest("Test Provision device", "Test Provision gateway", TransportPayloadType.JSON, null, null, null, null, DeviceProfileProvisionType.DISABLED, null, null);
  97 + super.processBeforeTest("Test Provision device", "Test Provision gateway", TransportPayloadType.JSON, null, null, null, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
98 98 byte[] result = createMqttClientAndPublish().getPayloadBytes();
99 99 JsonObject response = JsonUtils.parse(new String(result)).getAsJsonObject();
100 100 Assert.assertEquals("Provision data was not found!", response.get("errorMsg").getAsString());
... ... @@ -103,7 +103,7 @@ public abstract class AbstractMqttProvisionJsonDeviceTest extends AbstractMqttIn
103 103
104 104
105 105 protected void processTestProvisioningCreateNewDeviceWithoutCredentials() throws Exception {
106   - super.processBeforeTest("Test Provision device3", "Test Provision gateway", TransportPayloadType.JSON, null, null, null, null, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES, "testProvisionKey", "testProvisionSecret");
  106 + super.processBeforeTest("Test Provision device3", "Test Provision gateway", TransportPayloadType.JSON, null, null, null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
107 107 byte[] result = createMqttClientAndPublish().getPayloadBytes();
108 108 JsonObject response = JsonUtils.parse(new String(result)).getAsJsonObject();
109 109
... ... @@ -119,7 +119,7 @@ public abstract class AbstractMqttProvisionJsonDeviceTest extends AbstractMqttIn
119 119
120 120
121 121 protected void processTestProvisioningCreateNewDeviceWithAccessToken() throws Exception {
122   - super.processBeforeTest("Test Provision device3", "Test Provision gateway", TransportPayloadType.JSON, null, null, null, null, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES, "testProvisionKey", "testProvisionSecret");
  122 + super.processBeforeTest("Test Provision device3", "Test Provision gateway", TransportPayloadType.JSON, null, null, null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
123 123 String requestCredentials = ",\"credentialsType\": \"ACCESS_TOKEN\",\"token\": \"test_token\"";
124 124 byte[] result = createMqttClientAndPublish(requestCredentials).getPayloadBytes();
125 125 JsonObject response = JsonUtils.parse(new String(result)).getAsJsonObject();
... ... @@ -138,7 +138,7 @@ public abstract class AbstractMqttProvisionJsonDeviceTest extends AbstractMqttIn
138 138
139 139
140 140 protected void processTestProvisioningCreateNewDeviceWithCert() throws Exception {
141   - super.processBeforeTest("Test Provision device3", "Test Provision gateway", TransportPayloadType.JSON, null, null, null, null, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES, "testProvisionKey", "testProvisionSecret");
  141 + super.processBeforeTest("Test Provision device3", "Test Provision gateway", TransportPayloadType.JSON, null, null, null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
142 142 String requestCredentials = ",\"credentialsType\": \"X509_CERTIFICATE\",\"hash\": \"testHash\"";
143 143 byte[] result = createMqttClientAndPublish(requestCredentials).getPayloadBytes();
144 144 JsonObject response = JsonUtils.parse(new String(result)).getAsJsonObject();
... ... @@ -163,7 +163,7 @@ public abstract class AbstractMqttProvisionJsonDeviceTest extends AbstractMqttIn
163 163
164 164
165 165 protected void processTestProvisioningCreateNewDeviceWithMqttBasic() throws Exception {
166   - super.processBeforeTest("Test Provision device3", "Test Provision gateway", TransportPayloadType.JSON, null, null, null, null, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES, "testProvisionKey", "testProvisionSecret");
  166 + super.processBeforeTest("Test Provision device3", "Test Provision gateway", TransportPayloadType.JSON, null, null, null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
167 167 String requestCredentials = ",\"credentialsType\": \"MQTT_BASIC\",\"clientId\": \"test_clientId\",\"username\": \"test_username\",\"password\": \"test_password\"";
168 168 byte[] result = createMqttClientAndPublish(requestCredentials).getPayloadBytes();
169 169 JsonObject response = JsonUtils.parse(new String(result)).getAsJsonObject();
... ... @@ -188,7 +188,7 @@ public abstract class AbstractMqttProvisionJsonDeviceTest extends AbstractMqttIn
188 188 }
189 189
190 190 protected void processTestProvisioningCheckPreProvisionedDevice() throws Exception {
191   - super.processBeforeTest("Test Provision device", "Test Provision gateway", TransportPayloadType.JSON, null, null, null, null, DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES, "testProvisionKey", "testProvisionSecret");
  191 + super.processBeforeTest("Test Provision device", "Test Provision gateway", TransportPayloadType.JSON, null, null, null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES);
192 192 byte[] result = createMqttClientAndPublish().getPayloadBytes();
193 193 JsonObject response = JsonUtils.parse(new String(result)).getAsJsonObject();
194 194
... ... @@ -199,7 +199,7 @@ public abstract class AbstractMqttProvisionJsonDeviceTest extends AbstractMqttIn
199 199 }
200 200
201 201 protected void processTestProvisioningWithBadKeyDevice() throws Exception {
202   - super.processBeforeTest("Test Provision device", "Test Provision gateway", TransportPayloadType.JSON, null, null, null, null, DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES, "testProvisionKeyOrig", "testProvisionSecret");
  202 + super.processBeforeTest("Test Provision device", "Test Provision gateway", TransportPayloadType.JSON, null, null, null, null, null, null, "testProvisionKeyOrig", "testProvisionSecret", DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES);
203 203 byte[] result = createMqttClientAndPublish().getPayloadBytes();
204 204 JsonObject response = JsonUtils.parse(new String(result)).getAsJsonObject();
205 205 Assert.assertEquals("Provision data was not found!", response.get("errorMsg").getAsString());
... ...
... ... @@ -101,14 +101,14 @@ public abstract class AbstractMqttProvisionProtoDeviceTest extends AbstractMqttI
101 101
102 102
103 103 protected void processTestProvisioningDisabledDevice() throws Exception {
104   - super.processBeforeTest("Test Provision device", "Test Provision gateway", TransportPayloadType.PROTOBUF, null, null, null, null, DeviceProfileProvisionType.DISABLED, null, null);
  104 + super.processBeforeTest("Test Provision device", "Test Provision gateway", TransportPayloadType.PROTOBUF, null, null, null, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
105 105 ProvisionDeviceResponseMsg result = ProvisionDeviceResponseMsg.parseFrom(createMqttClientAndPublish().getPayloadBytes());
106 106 Assert.assertNotNull(result);
107 107 Assert.assertEquals(ProvisionResponseStatus.NOT_FOUND.name(), result.getStatus().toString());
108 108 }
109 109
110 110 protected void processTestProvisioningCreateNewDeviceWithoutCredentials() throws Exception {
111   - super.processBeforeTest("Test Provision device3", "Test Provision gateway", TransportPayloadType.PROTOBUF, null, null, null, null, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES, "testProvisionKey", "testProvisionSecret");
  111 + super.processBeforeTest("Test Provision device3", "Test Provision gateway", TransportPayloadType.PROTOBUF, null, null, null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
112 112 ProvisionDeviceResponseMsg response = ProvisionDeviceResponseMsg.parseFrom(createMqttClientAndPublish().getPayloadBytes());
113 113
114 114 Device createdDevice = deviceService.findDeviceByTenantIdAndName(savedTenant.getTenantId(), "Test Provision device");
... ... @@ -122,7 +122,7 @@ public abstract class AbstractMqttProvisionProtoDeviceTest extends AbstractMqttI
122 122 }
123 123
124 124 protected void processTestProvisioningCreateNewDeviceWithAccessToken() throws Exception {
125   - super.processBeforeTest("Test Provision device3", "Test Provision gateway", TransportPayloadType.PROTOBUF, null, null,null, null, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES, "testProvisionKey", "testProvisionSecret");
  125 + super.processBeforeTest("Test Provision device3", "Test Provision gateway", TransportPayloadType.PROTOBUF, null, null,null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
126 126 CredentialsDataProto requestCredentials = CredentialsDataProto.newBuilder().setValidateDeviceTokenRequestMsg(ValidateDeviceTokenRequestMsg.newBuilder().setToken("test_token").build()).build();
127 127
128 128 ProvisionDeviceResponseMsg response = ProvisionDeviceResponseMsg.parseFrom(createMqttClientAndPublish(createTestsProvisionMessage(CredentialsType.ACCESS_TOKEN, requestCredentials)).getPayloadBytes());
... ... @@ -140,7 +140,7 @@ public abstract class AbstractMqttProvisionProtoDeviceTest extends AbstractMqttI
140 140 }
141 141
142 142 protected void processTestProvisioningCreateNewDeviceWithCert() throws Exception {
143   - super.processBeforeTest("Test Provision device3", "Test Provision gateway", TransportPayloadType.PROTOBUF, null, null, null, null, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES, "testProvisionKey", "testProvisionSecret");
  143 + super.processBeforeTest("Test Provision device3", "Test Provision gateway", TransportPayloadType.PROTOBUF, null, null, null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
144 144 CredentialsDataProto requestCredentials = CredentialsDataProto.newBuilder().setValidateDeviceX509CertRequestMsg(ValidateDeviceX509CertRequestMsg.newBuilder().setHash("testHash").build()).build();
145 145
146 146 ProvisionDeviceResponseMsg response = ProvisionDeviceResponseMsg.parseFrom(createMqttClientAndPublish(createTestsProvisionMessage(CredentialsType.X509_CERTIFICATE, requestCredentials)).getPayloadBytes());
... ... @@ -164,7 +164,7 @@ public abstract class AbstractMqttProvisionProtoDeviceTest extends AbstractMqttI
164 164 }
165 165
166 166 protected void processTestProvisioningCreateNewDeviceWithMqttBasic() throws Exception {
167   - super.processBeforeTest("Test Provision device3", "Test Provision gateway", TransportPayloadType.PROTOBUF, null, null, null, null, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES, "testProvisionKey", "testProvisionSecret");
  167 + super.processBeforeTest("Test Provision device3", "Test Provision gateway", TransportPayloadType.PROTOBUF, null, null, null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
168 168 CredentialsDataProto requestCredentials = CredentialsDataProto.newBuilder().setValidateBasicMqttCredRequestMsg(
169 169 ValidateBasicMqttCredRequestMsg.newBuilder()
170 170 .setClientId("test_clientId")
... ... @@ -195,7 +195,7 @@ public abstract class AbstractMqttProvisionProtoDeviceTest extends AbstractMqttI
195 195 }
196 196
197 197 protected void processTestProvisioningCheckPreProvisionedDevice() throws Exception {
198   - super.processBeforeTest("Test Provision device", "Test Provision gateway", TransportPayloadType.PROTOBUF, null, null, null, null, DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES, "testProvisionKey", "testProvisionSecret");
  198 + super.processBeforeTest("Test Provision device", "Test Provision gateway", TransportPayloadType.PROTOBUF, null, null, null, null, null, null, "testProvisionKey", "testProvisionSecret", DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES);
199 199 ProvisionDeviceResponseMsg response = ProvisionDeviceResponseMsg.parseFrom(createMqttClientAndPublish().getPayloadBytes());
200 200
201 201 DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(savedTenant.getTenantId(), savedDevice.getId());
... ... @@ -205,7 +205,7 @@ public abstract class AbstractMqttProvisionProtoDeviceTest extends AbstractMqttI
205 205 }
206 206
207 207 protected void processTestProvisioningWithBadKeyDevice() throws Exception {
208   - super.processBeforeTest("Test Provision device", "Test Provision gateway", TransportPayloadType.PROTOBUF, null, null, null, null, DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES, "testProvisionKeyOrig", "testProvisionSecret");
  208 + super.processBeforeTest("Test Provision device", "Test Provision gateway", TransportPayloadType.PROTOBUF, null, null, null, null, null, null, "testProvisionKeyOrig", "testProvisionSecret", DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES);
209 209 ProvisionDeviceResponseMsg response = ProvisionDeviceResponseMsg.parseFrom(createMqttClientAndPublish().getPayloadBytes());
210 210 Assert.assertEquals(ProvisionResponseStatus.NOT_FOUND.name(), response.getStatus().toString());
211 211 }
... ...
... ... @@ -180,7 +180,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
180 180 return message;
181 181 }
182 182
183   - private class TestMqttCallback implements MqttCallback {
  183 + protected class TestMqttCallback implements MqttCallback {
184 184
185 185 private final MqttAsyncClient client;
186 186 private final CountDownLatch latch;
... ...
... ... @@ -15,25 +15,60 @@
15 15 */
16 16 package org.thingsboard.server.transport.mqtt.rpc;
17 17
  18 +import com.github.os72.protobuf.dynamic.DynamicSchema;
  19 +import com.google.protobuf.Descriptors;
  20 +import com.google.protobuf.DynamicMessage;
18 21 import com.google.protobuf.InvalidProtocolBufferException;
  22 +import com.squareup.wire.schema.internal.parser.ProtoFileElement;
19 23 import lombok.extern.slf4j.Slf4j;
  24 +import org.eclipse.californium.core.CoapHandler;
  25 +import org.eclipse.californium.core.CoapResponse;
  26 +import org.eclipse.californium.core.coap.MediaTypeRegistry;
20 27 import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
21 28 import org.eclipse.paho.client.mqttv3.MqttException;
22 29 import org.eclipse.paho.client.mqttv3.MqttMessage;
  30 +import org.jetbrains.annotations.NotNull;
23 31 import org.junit.After;
  32 +import org.junit.Assert;
24 33 import org.junit.Before;
25 34 import org.junit.Test;
  35 +import org.thingsboard.server.common.data.DeviceProfileProvisionType;
26 36 import org.thingsboard.server.common.data.TransportPayloadType;
  37 +import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
  38 +import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
27 39 import org.thingsboard.server.common.data.device.profile.MqttTopics;
  40 +import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
  41 +import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
28 42 import org.thingsboard.server.gen.transport.TransportApiProtos;
29   -import org.thingsboard.server.gen.transport.TransportProtos;
  43 +
  44 +import java.util.List;
  45 +import java.util.concurrent.CountDownLatch;
  46 +import java.util.concurrent.TimeUnit;
  47 +
  48 +import static org.junit.Assert.assertNotNull;
  49 +import static org.junit.Assert.assertTrue;
  50 +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
30 51
31 52 @Slf4j
32 53 public abstract class AbstractMqttServerSideRpcProtoIntegrationTest extends AbstractMqttServerSideRpcIntegrationTest {
33 54
  55 + private static final String RPC_REQUEST_PROTO_SCHEMA = "syntax =\"proto3\";\n" +
  56 + "package rpc;\n" +
  57 + "\n" +
  58 + "message RpcRequestMsg {\n" +
  59 + " string method = 1;\n" +
  60 + " int32 requestId = 2;\n" +
  61 + " Params params = 3;\n" +
  62 + "\n" +
  63 + " message Params {\n" +
  64 + " string pin = 1;\n" +
  65 + " int32 value = 2;\n" +
  66 + " }\n" +
  67 + "}";
  68 +
34 69 @Before
35 70 public void beforeTest() throws Exception {
36   - processBeforeTest("RPC test device", "RPC test gateway", TransportPayloadType.PROTOBUF, null, null);
  71 + processBeforeTest("RPC test device", "RPC test gateway", TransportPayloadType.PROTOBUF, null, null, null, null, null, RPC_REQUEST_PROTO_SCHEMA, null, null, DeviceProfileProvisionType.DISABLED);
37 72 }
38 73
39 74 @After
... ... @@ -83,14 +118,55 @@ public abstract class AbstractMqttServerSideRpcProtoIntegrationTest extends Abst
83 118 return builder.build();
84 119 }
85 120
  121 + protected void processTwoWayRpcTest() throws Exception {
  122 + MqttAsyncClient client = getMqttAsyncClient(accessToken);
  123 + client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, 1);
  124 +
  125 + CountDownLatch latch = new CountDownLatch(1);
  126 + TestMqttCallback callback = new TestMqttCallback(client, latch);
  127 + client.setCallback(callback);
  128 +
  129 + Thread.sleep(1000);
  130 +
  131 + String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}";
  132 + String deviceId = savedDevice.getId().getId().toString();
  133 +
  134 + String result = doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk());
  135 + String expected = "{\"payload\":\"{\\\"value1\\\":\\\"A\\\",\\\"value2\\\":\\\"B\\\"}\"}";
  136 + latch.await(3, TimeUnit.SECONDS);
  137 + Assert.assertEquals(expected, result);
  138 + }
  139 +
86 140 protected MqttMessage processMessageArrived(String requestTopic, MqttMessage mqttMessage) throws MqttException, InvalidProtocolBufferException {
87 141 MqttMessage message = new MqttMessage();
88 142 if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC)) {
89   - TransportProtos.ToDeviceRpcResponseMsg toDeviceRpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
90   - .setPayload(DEVICE_RESPONSE)
91   - .setRequestId(0)
  143 + ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = getProtoTransportPayloadConfiguration();
  144 + ProtoFileElement rpcRequestProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(RPC_REQUEST_PROTO_SCHEMA);
  145 + DynamicSchema rpcRequestProtoSchema = protoTransportPayloadConfiguration.getDynamicSchema(rpcRequestProtoSchemaFile, ProtoTransportPayloadConfiguration.RPC_REQUEST_PROTO_SCHEMA);
  146 +
  147 + byte[] requestPayload = mqttMessage.getPayload();
  148 + DynamicMessage.Builder rpcRequestMsg = rpcRequestProtoSchema.newMessageBuilder("RpcRequestMsg");
  149 + Descriptors.Descriptor rpcRequestMsgDescriptor = rpcRequestMsg.getDescriptorForType();
  150 + assertNotNull(rpcRequestMsgDescriptor);
  151 + try {
  152 + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(rpcRequestMsgDescriptor, requestPayload);
  153 + List<Descriptors.FieldDescriptor> fields = rpcRequestMsgDescriptor.getFields();
  154 + for (Descriptors.FieldDescriptor fieldDescriptor: fields) {
  155 + assertTrue(dynamicMessage.hasField(fieldDescriptor));
  156 + }
  157 + ProtoFileElement transportProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_RPC_RESPONSE_PROTO_SCHEMA);
  158 + DynamicSchema rpcResponseProtoSchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchemaFile, ProtoTransportPayloadConfiguration.RPC_RESPONSE_PROTO_SCHEMA);
  159 +
  160 + DynamicMessage.Builder rpcResponseBuilder = rpcResponseProtoSchema.newMessageBuilder("RpcResponseMsg");
  161 + Descriptors.Descriptor rpcResponseMsgDescriptor = rpcResponseBuilder.getDescriptorForType();
  162 + assertNotNull(rpcResponseMsgDescriptor);
  163 + DynamicMessage rpcResponseMsg = rpcResponseBuilder
  164 + .setField(rpcResponseMsgDescriptor.findFieldByName("payload"), DEVICE_RESPONSE)
92 165 .build();
93   - message.setPayload(toDeviceRpcResponseMsg.toByteArray());
  166 + message.setPayload(rpcResponseMsg.toByteArray());
  167 + } catch (InvalidProtocolBufferException e) {
  168 + log.warn("Command Response Ack Error, Invalid response received: ", e);
  169 + }
94 170 } else {
95 171 TransportApiProtos.GatewayDeviceRpcRequestMsg msg = TransportApiProtos.GatewayDeviceRpcRequestMsg.parseFrom(mqttMessage.getPayload());
96 172 String deviceName = msg.getDeviceName();
... ... @@ -105,6 +181,14 @@ public abstract class AbstractMqttServerSideRpcProtoIntegrationTest extends Abst
105 181 return message;
106 182 }
107 183
  184 + private ProtoTransportPayloadConfiguration getProtoTransportPayloadConfiguration() {
  185 + DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
  186 + assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
  187 + MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
  188 + TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
  189 + assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
  190 + return (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
  191 + }
108 192
109 193
110 194 }
... ...
... ... @@ -119,7 +119,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
119 119 " }\n" +
120 120 " }\n" +
121 121 "}";
122   - super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null, schemaStr, null, DeviceProfileProvisionType.DISABLED, null, null);
  122 + super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null, schemaStr, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
123 123 List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
124 124 DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
125 125 assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
... ... @@ -172,7 +172,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
172 172
173 173 @Test
174 174 public void testPushMqttTelemetryGateway() throws Exception {
175   - super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, null, null, null, null, DeviceProfileProvisionType.DISABLED, null, null);
  175 + super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, null, null, null, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
176 176 TransportApiProtos.GatewayTelemetryMsg.Builder gatewayTelemetryMsgProtoBuilder = TransportApiProtos.GatewayTelemetryMsg.newBuilder();
177 177 List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
178 178 String deviceName1 = "Device A";
... ... @@ -186,7 +186,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
186 186
187 187 @Test
188 188 public void testGatewayConnect() throws Exception {
189   - super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null, null, null, DeviceProfileProvisionType.DISABLED, null, null);
  189 + super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null, null, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
190 190 String deviceName = "Device A";
191 191 TransportApiProtos.ConnectMsg connectMsgProto = getConnectProto(deviceName);
192 192 MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
... ...
... ... @@ -15,7 +15,6 @@
15 15 */
16 16 package org.thingsboard.server.common.data.device.profile;
17 17
18   -import com.fasterxml.jackson.annotation.JsonIgnore;
19 18 import com.github.os72.protobuf.dynamic.DynamicSchema;
20 19 import com.github.os72.protobuf.dynamic.EnumDefinition;
21 20 import com.github.os72.protobuf.dynamic.MessageDefinition;
... ... @@ -46,9 +45,13 @@ public class ProtoTransportPayloadConfiguration implements TransportPayloadTypeC
46 45 public static final Location LOCATION = new Location("", "", -1, -1);
47 46 public static final String ATTRIBUTES_PROTO_SCHEMA = "attributes proto schema";
48 47 public static final String TELEMETRY_PROTO_SCHEMA = "telemetry proto schema";
  48 + public static final String RPC_RESPONSE_PROTO_SCHEMA = "rpc response proto schema";
  49 + public static final String RPC_REQUEST_PROTO_SCHEMA = "rpc request proto schema";
49 50
50 51 private String deviceTelemetryProtoSchema;
51 52 private String deviceAttributesProtoSchema;
  53 + private String deviceRpcRequestProtoSchema;
  54 + private String deviceRpcResponseProtoSchema;
52 55
53 56 @Override
54 57 public TransportPayloadType getTransportPayloadType() {
... ... @@ -63,13 +66,45 @@ public class ProtoTransportPayloadConfiguration implements TransportPayloadTypeC
63 66 return getDescriptor(deviceAttributesProtoSchema, ATTRIBUTES_PROTO_SCHEMA);
64 67 }
65 68
  69 + public Descriptors.Descriptor getRpcResponseDynamicMessageDescriptor(String deviceRpcResponseProtoSchema) {
  70 + return getDescriptor(deviceRpcResponseProtoSchema, RPC_RESPONSE_PROTO_SCHEMA);
  71 + }
  72 +
  73 + public DynamicMessage.Builder getRpcRequestDynamicMessageBuilder(String deviceRpcRequestProtoSchema) {
  74 + return getDynamicMessageBuilder(deviceRpcRequestProtoSchema, RPC_REQUEST_PROTO_SCHEMA);
  75 + }
  76 +
  77 + public String getDeviceRpcResponseProtoSchema() {
  78 + if (!isEmptyStr(deviceRpcResponseProtoSchema)) {
  79 + return deviceRpcResponseProtoSchema;
  80 + } else {
  81 + return "syntax =\"proto3\";\n" +
  82 + "package rpc;\n" +
  83 + "\n" +
  84 + "message RpcResponseMsg {\n" +
  85 + " string payload = 1;\n" +
  86 + "}";
  87 + }
  88 + }
  89 +
  90 + public String getDeviceRpcRequestProtoSchema() {
  91 + if (!isEmptyStr(deviceRpcRequestProtoSchema)) {
  92 + return deviceRpcRequestProtoSchema;
  93 + } else {
  94 + return "syntax =\"proto3\";\n" +
  95 + "package rpc;\n" +
  96 + "\n" +
  97 + "message RpcRequestMsg {\n" +
  98 + " string method = 1;\n" +
  99 + " int32 requestId = 2;\n" +
  100 + " string params = 3;\n" +
  101 + "}";
  102 + }
  103 + }
  104 +
66 105 private Descriptors.Descriptor getDescriptor(String protoSchema, String schemaName) {
67 106 try {
68   - ProtoFileElement protoFileElement = getTransportProtoSchema(protoSchema);
69   - DynamicSchema dynamicSchema = getDynamicSchema(protoFileElement, schemaName);
70   - String lastMsgName = getMessageTypes(protoFileElement.getTypes()).stream()
71   - .map(MessageElement::getName).reduce((previous, last) -> last).get();
72   - DynamicMessage.Builder builder = dynamicSchema.newMessageBuilder(lastMsgName);
  107 + DynamicMessage.Builder builder = getDynamicMessageBuilder(protoSchema, schemaName);
73 108 return builder.getDescriptorForType();
74 109 } catch (Exception e) {
75 110 log.warn("Failed to get Message Descriptor due to {}", e.getMessage());
... ... @@ -77,6 +112,14 @@ public class ProtoTransportPayloadConfiguration implements TransportPayloadTypeC
77 112 }
78 113 }
79 114
  115 + public DynamicMessage.Builder getDynamicMessageBuilder(String protoSchema, String schemaName) {
  116 + ProtoFileElement protoFileElement = getTransportProtoSchema(protoSchema);
  117 + DynamicSchema dynamicSchema = getDynamicSchema(protoFileElement, schemaName);
  118 + String lastMsgName = getMessageTypes(protoFileElement.getTypes()).stream()
  119 + .map(MessageElement::getName).reduce((previous, last) -> last).get();
  120 + return dynamicSchema.newMessageBuilder(lastMsgName);
  121 + }
  122 +
80 123 public DynamicSchema getDynamicSchema(ProtoFileElement protoFileElement, String schemaName) {
81 124 DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
82 125 schemaBuilder.setName(schemaName);
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.transport.coap;
17 17
18 18 import com.google.gson.JsonParseException;
19 19 import com.google.protobuf.Descriptors;
  20 +import com.google.protobuf.DynamicMessage;
20 21 import lombok.Data;
21 22 import lombok.extern.slf4j.Slf4j;
22 23 import org.eclipse.californium.core.coap.CoAP;
... ... @@ -261,7 +262,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
261 262 TransportProtos.SessionInfoProto currentAttrSession = tokenToSessionIdMap.get(getTokenFromRequest(request));
262 263 if (currentAttrSession == null) {
263 264 attributeSubscriptions.add(sessionId);
264   - registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, getTokenFromRequest(request));
  265 + registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor,
  266 + transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request));
265 267 transportService.process(sessionInfo,
266 268 TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), new CoapNoOpCallback(exchange));
267 269 }
... ... @@ -281,7 +283,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
281 283 TransportProtos.SessionInfoProto currentRpcSession = tokenToSessionIdMap.get(getTokenFromRequest(request));
282 284 if (currentRpcSession == null) {
283 285 rpcSubscriptions.add(sessionId);
284   - registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, getTokenFromRequest(request));
  286 + registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor,
  287 + transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request));
285 288 transportService.process(sessionInfo,
286 289 TransportProtos.SubscribeToRPCMsg.getDefaultInstance(),
287 290 new CoapNoOpCallback(exchange));
... ... @@ -303,17 +306,19 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
303 306 break;
304 307 case TO_DEVICE_RPC_RESPONSE:
305 308 transportService.process(sessionInfo,
306   - coapTransportAdaptor.convertToDeviceRpcResponse(sessionId, request),
  309 + coapTransportAdaptor.convertToDeviceRpcResponse(sessionId, request, transportConfigurationContainer.getRpcResponseMsgDescriptor()),
307 310 new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
308 311 break;
309 312 case TO_SERVER_RPC_REQUEST:
310   - transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor), timeout);
  313 + transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor,
  314 + transportConfigurationContainer.getRpcRequestDynamicMessageBuilder()), timeout);
311 315 transportService.process(sessionInfo,
312 316 coapTransportAdaptor.convertToServerRpcRequest(sessionId, request),
313 317 new CoapNoOpCallback(exchange));
314 318 break;
315 319 case GET_ATTRIBUTES_REQUEST:
316   - transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor), timeout);
  320 + transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor,
  321 + transportConfigurationContainer.getRpcRequestDynamicMessageBuilder()), timeout);
317 322 transportService.process(sessionInfo,
318 323 coapTransportAdaptor.convertToGetAttributes(sessionId, request),
319 324 new CoapNoOpCallback(exchange));
... ... @@ -330,14 +335,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
330 335 return tokenToSessionIdMap.remove(token);
331 336 }
332 337
333   - private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, String token) {
  338 + private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) {
334 339 tokenToSessionIdMap.putIfAbsent(token, sessionInfo);
335   - transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor));
  340 + transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder));
336 341 transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
337 342 }
338 343
339   - private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor) {
340   - return new CoapSessionListener(exchange, coapTransportAdaptor);
  344 + private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) {
  345 + return new CoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder);
341 346 }
342 347
343 348 private String getTokenFromRequest(Request request) {
... ... @@ -423,10 +428,12 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
423 428
424 429 private final CoapExchange exchange;
425 430 private final CoapTransportAdaptor coapTransportAdaptor;
  431 + private final DynamicMessage.Builder rpcRequestDynamicMessageBuilder;
426 432
427   - CoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor) {
  433 + CoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) {
428 434 this.exchange = exchange;
429 435 this.coapTransportAdaptor = coapTransportAdaptor;
  436 + this.rpcRequestDynamicMessageBuilder = rpcRequestDynamicMessageBuilder;
430 437 }
431 438
432 439 @Override
... ... @@ -457,7 +464,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
457 464 @Override
458 465 public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg msg) {
459 466 try {
460   - exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg));
  467 + exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder));
461 468 } catch (AdaptorException e) {
462 469 log.trace("Failed to reply due to error", e);
463 470 exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
... ... @@ -545,9 +552,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
545 552 (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
546 553 String deviceTelemetryProtoSchema = protoTransportPayloadConfiguration.getDeviceTelemetryProtoSchema();
547 554 String deviceAttributesProtoSchema = protoTransportPayloadConfiguration.getDeviceAttributesProtoSchema();
  555 + String deviceRpcRequestProtoSchema = protoTransportPayloadConfiguration.getDeviceRpcRequestProtoSchema();
  556 + String deviceRpcResponseProtoSchema = protoTransportPayloadConfiguration.getDeviceRpcResponseProtoSchema();
548 557 return new TransportConfigurationContainer(false,
549 558 protoTransportPayloadConfiguration.getTelemetryDynamicMessageDescriptor(deviceTelemetryProtoSchema),
550   - protoTransportPayloadConfiguration.getAttributesDynamicMessageDescriptor(deviceAttributesProtoSchema));
  559 + protoTransportPayloadConfiguration.getAttributesDynamicMessageDescriptor(deviceAttributesProtoSchema),
  560 + protoTransportPayloadConfiguration.getRpcResponseDynamicMessageDescriptor(deviceRpcResponseProtoSchema),
  561 + protoTransportPayloadConfiguration.getRpcRequestDynamicMessageBuilder(deviceRpcRequestProtoSchema)
  562 + );
551 563 }
552 564 } else {
553 565 throw new AdaptorException("Invalid CoapDeviceTypeConfiguration type: " + coapDeviceTypeConfiguration.getClass().getSimpleName() + "!");
... ... @@ -567,11 +579,15 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
567 579 private boolean jsonPayload;
568 580 private Descriptors.Descriptor telemetryMsgDescriptor;
569 581 private Descriptors.Descriptor attributesMsgDescriptor;
  582 + private Descriptors.Descriptor rpcResponseMsgDescriptor;
  583 + private DynamicMessage.Builder rpcRequestDynamicMessageBuilder;
570 584
571   - public TransportConfigurationContainer(boolean jsonPayload, Descriptors.Descriptor telemetryMsgDescriptor, Descriptors.Descriptor attributesMsgDescriptor) {
  585 + public TransportConfigurationContainer(boolean jsonPayload, Descriptors.Descriptor telemetryMsgDescriptor, Descriptors.Descriptor attributesMsgDescriptor, Descriptors.Descriptor rpcResponseMsgDescriptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) {
572 586 this.jsonPayload = jsonPayload;
573 587 this.telemetryMsgDescriptor = telemetryMsgDescriptor;
574 588 this.attributesMsgDescriptor = attributesMsgDescriptor;
  589 + this.rpcResponseMsgDescriptor = rpcResponseMsgDescriptor;
  590 + this.rpcRequestDynamicMessageBuilder = rpcRequestDynamicMessageBuilder;
575 591 }
576 592
577 593 public TransportConfigurationContainer(boolean jsonPayload) {
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.transport.coap.adaptors;
17 17
18 18 import com.google.protobuf.Descriptors;
  19 +import com.google.protobuf.DynamicMessage;
19 20 import org.eclipse.californium.core.coap.Request;
20 21 import org.eclipse.californium.core.coap.Response;
21 22 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
... ... @@ -32,7 +33,7 @@ public interface CoapTransportAdaptor {
32 33
33 34 TransportProtos.GetAttributeRequestMsg convertToGetAttributes(UUID sessionId, Request inbound) throws AdaptorException;
34 35
35   - TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(UUID sessionId, Request inbound) throws AdaptorException;
  36 + TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(UUID sessionId, Request inbound, Descriptors.Descriptor rpcResponseMsgDescriptor) throws AdaptorException;
36 37
37 38 TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(UUID sessionId, Request inbound) throws AdaptorException;
38 39
... ... @@ -42,7 +43,7 @@ public interface CoapTransportAdaptor {
42 43
43 44 Response convertToPublish(boolean isConfirmable, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
44 45
45   - Response convertToPublish(boolean isConfirmable, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
  46 + Response convertToPublish(boolean isConfirmable, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) throws AdaptorException;
46 47
47 48 Response convertToPublish(TransportProtos.ToServerRpcResponseMsg msg) throws AdaptorException;
48 49
... ...
... ... @@ -20,6 +20,7 @@ import com.google.gson.JsonObject;
20 20 import com.google.gson.JsonParser;
21 21 import com.google.gson.JsonSyntaxException;
22 22 import com.google.protobuf.Descriptors;
  23 +import com.google.protobuf.DynamicMessage;
23 24 import lombok.extern.slf4j.Slf4j;
24 25 import org.eclipse.californium.core.coap.CoAP;
25 26 import org.eclipse.californium.core.coap.Request;
... ... @@ -64,7 +65,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
64 65 }
65 66
66 67 @Override
67   - public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(UUID sessionId, Request inbound) throws AdaptorException {
  68 + public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(UUID sessionId, Request inbound, Descriptors.Descriptor rpcResponseMsgDescriptor) throws AdaptorException {
68 69 Optional<Integer> requestId = CoapTransportResource.getRequestId(inbound);
69 70 String payload = validatePayload(sessionId, inbound, false);
70 71 JsonObject response = new JsonParser().parse(payload).getAsJsonObject();
... ... @@ -95,7 +96,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
95 96 }
96 97
97 98 @Override
98   - public Response convertToPublish(boolean isConfirmable, TransportProtos.ToDeviceRpcRequestMsg msg) throws AdaptorException {
  99 + public Response convertToPublish(boolean isConfirmable, TransportProtos.ToDeviceRpcRequestMsg msg, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) throws AdaptorException {
99 100 return getObserveNotification(isConfirmable, JsonConverter.toJson(msg, true));
100 101 }
101 102
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.transport.coap.adaptors;
17 17
  18 +import com.google.gson.JsonElement;
18 19 import com.google.gson.JsonParser;
19 20 import com.google.protobuf.Descriptors;
20 21 import com.google.protobuf.DynamicMessage;
... ... @@ -63,16 +64,16 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor {
63 64 }
64 65
65 66 @Override
66   - public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(UUID sessionId, Request inbound) throws AdaptorException {
  67 + public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(UUID sessionId, Request inbound, Descriptors.Descriptor rpcResponseMsgDescriptor) throws AdaptorException {
67 68 Optional<Integer> requestId = CoapTransportResource.getRequestId(inbound);
68 69 if (requestId.isEmpty()) {
69 70 throw new AdaptorException("Request id is missing!");
70 71 } else {
71 72 try {
72   - String payload = TransportProtos.ToDeviceRpcResponseMsg.parseFrom(inbound.getPayload()).getPayload();
73   - return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId.get())
74   - .setPayload(payload).build();
75   - } catch (InvalidProtocolBufferException e) {
  73 + JsonElement response = new JsonParser().parse(dynamicMsgToJson(inbound.getPayload(), rpcResponseMsgDescriptor));
  74 + return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId.orElseThrow(() -> new AdaptorException("Request id is missing!")))
  75 + .setPayload(response.toString()).build();
  76 + } catch (Exception e) {
76 77 throw new AdaptorException(e);
77 78 }
78 79 }
... ... @@ -112,8 +113,8 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor {
112 113 }
113 114
114 115 @Override
115   - public Response convertToPublish(boolean isConfirmable, TransportProtos.ToDeviceRpcRequestMsg msg) throws AdaptorException {
116   - return getObserveNotification(isConfirmable, msg.toByteArray());
  116 + public Response convertToPublish(boolean isConfirmable, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) throws AdaptorException {
  117 + return getObserveNotification(isConfirmable, ProtoConverter.convertToRpcRequest(rpcRequest, rpcRequestDynamicMessageBuilder));
117 118 }
118 119
119 120 @Override
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.transport.mqtt.adaptors;
17 17
  18 +import com.google.gson.JsonElement;
18 19 import com.google.gson.JsonParser;
19 20 import com.google.protobuf.Descriptors;
20 21 import com.google.protobuf.DynamicMessage;
... ... @@ -64,9 +65,9 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
64 65 public TransportProtos.PostAttributeMsg convertToPostAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
65 66 DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
66 67 byte[] bytes = toBytes(inbound.payload());
67   - Descriptors.Descriptor attributesDynamicMessage = getDescriptor(deviceSessionCtx.getAttributesDynamicMessageDescriptor());
  68 + Descriptors.Descriptor attributesDynamicMessageDescriptor = getDescriptor(deviceSessionCtx.getAttributesDynamicMessageDescriptor());
68 69 try {
69   - return JsonConverter.convertToAttributesProto(new JsonParser().parse(dynamicMsgToJson(bytes, attributesDynamicMessage)));
  70 + return JsonConverter.convertToAttributesProto(new JsonParser().parse(dynamicMsgToJson(bytes, attributesDynamicMessageDescriptor)));
70 71 } catch (Exception e) {
71 72 throw new AdaptorException(e);
72 73 }
... ... @@ -86,8 +87,8 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
86 87 public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
87 88 byte[] bytes = toBytes(inbound.payload());
88 89 String topicName = inbound.variableHeader().topicName();
89   - int requestId = getRequestId(topicName, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
90 90 try {
  91 + int requestId = getRequestId(topicName, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
91 92 return ProtoConverter.convertToGetAttributeRequestMessage(bytes, requestId);
92 93 } catch (InvalidProtocolBufferException e) {
93 94 log.warn("Failed to decode get attributes request", e);
... ... @@ -97,10 +98,15 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
97 98
98 99 @Override
99 100 public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg) throws AdaptorException {
  101 + DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
  102 + String topicName = mqttMsg.variableHeader().topicName();
100 103 byte[] bytes = toBytes(mqttMsg.payload());
  104 + Descriptors.Descriptor rpcResponseDynamicMessageDescriptor = getDescriptor(deviceSessionCtx.getRpcResponseDynamicMessageDescriptor());
101 105 try {
102   - return TransportProtos.ToDeviceRpcResponseMsg.parseFrom(bytes);
103   - } catch (RuntimeException | InvalidProtocolBufferException e) {
  106 + int requestId = getRequestId(topicName, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC);
  107 + JsonElement response = new JsonParser().parse(dynamicMsgToJson(bytes, rpcResponseDynamicMessageDescriptor));
  108 + return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(response.toString()).build();
  109 + } catch (Exception e) {
104 110 log.warn("Failed to decode Rpc response", e);
105 111 throw new AdaptorException(e);
106 112 }
... ... @@ -145,8 +151,10 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
145 151
146 152
147 153 @Override
148   - public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
149   - return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), ProtoConverter.convertToRpcRequest(rpcRequest)));
  154 + public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException {
  155 + DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
  156 + DynamicMessage.Builder rpcRequestDynamicMessageBuilder = deviceSessionCtx.getRpcRequestDynamicMessageBuilder();
  157 + return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), ProtoConverter.convertToRpcRequest(rpcRequest, rpcRequestDynamicMessageBuilder)));
150 158 }
151 159
152 160 @Override
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.transport.mqtt.session;
17 17
18 18 import com.google.protobuf.Descriptors;
  19 +import com.google.protobuf.DynamicMessage;
19 20 import io.netty.channel.ChannelHandlerContext;
20 21 import lombok.Getter;
21 22 import lombok.Setter;
... ... @@ -60,6 +61,8 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
60 61 private volatile TransportPayloadType payloadType = TransportPayloadType.JSON;
61 62 private volatile Descriptors.Descriptor attributesDynamicMessageDescriptor;
62 63 private volatile Descriptors.Descriptor telemetryDynamicMessageDescriptor;
  64 + private volatile Descriptors.Descriptor rpcResponseDynamicMessageDescriptor;
  65 + private volatile DynamicMessage.Builder rpcRequestDynamicMessageBuilder;
63 66
64 67 @Getter
65 68 @Setter
... ... @@ -102,6 +105,14 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
102 105 return attributesDynamicMessageDescriptor;
103 106 }
104 107
  108 + public Descriptors.Descriptor getRpcResponseDynamicMessageDescriptor() {
  109 + return rpcResponseDynamicMessageDescriptor;
  110 + }
  111 +
  112 + public DynamicMessage.Builder getRpcRequestDynamicMessageBuilder() {
  113 + return rpcRequestDynamicMessageBuilder;
  114 + }
  115 +
105 116 @Override
106 117 public void setDeviceProfile(DeviceProfile deviceProfile) {
107 118 super.setDeviceProfile(deviceProfile);
... ... @@ -136,5 +147,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
136 147 ProtoTransportPayloadConfiguration protoTransportPayloadConfig = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
137 148 telemetryDynamicMessageDescriptor = protoTransportPayloadConfig.getTelemetryDynamicMessageDescriptor(protoTransportPayloadConfig.getDeviceTelemetryProtoSchema());
138 149 attributesDynamicMessageDescriptor = protoTransportPayloadConfig.getAttributesDynamicMessageDescriptor(protoTransportPayloadConfig.getDeviceAttributesProtoSchema());
  150 + rpcResponseDynamicMessageDescriptor = protoTransportPayloadConfig.getRpcResponseDynamicMessageDescriptor(protoTransportPayloadConfig.getDeviceRpcResponseProtoSchema());
  151 + rpcRequestDynamicMessageBuilder = protoTransportPayloadConfig.getRpcRequestDynamicMessageBuilder(protoTransportPayloadConfig.getDeviceRpcRequestProtoSchema());
139 152 }
140 153 }
... ...
... ... @@ -31,4 +31,8 @@ public class AdaptorException extends Exception {
31 31 super(cause);
32 32 }
33 33
  34 + public AdaptorException(String message, Exception cause) {
  35 + super(message, cause);
  36 + }
  37 +
34 38 }
... ...
... ... @@ -15,10 +15,13 @@
15 15 */
16 16 package org.thingsboard.server.common.transport.adaptor;
17 17
  18 +import com.google.gson.Gson;
18 19 import com.google.gson.JsonElement;
  20 +import com.google.gson.JsonObject;
19 21 import com.google.gson.JsonParser;
20   -import com.google.gson.JsonPrimitive;
  22 +import com.google.protobuf.DynamicMessage;
21 23 import com.google.protobuf.InvalidProtocolBufferException;
  24 +import com.google.protobuf.util.JsonFormat;
22 25 import lombok.extern.slf4j.Slf4j;
23 26 import org.springframework.util.CollectionUtils;
24 27 import org.springframework.util.StringUtils;
... ... @@ -34,6 +37,7 @@ import java.util.List;
34 37 @Slf4j
35 38 public class ProtoConverter {
36 39
  40 + public static final Gson GSON = new Gson();
37 41 public static final JsonParser JSON_PARSER = new JsonParser();
38 42
39 43 public static TransportProtos.PostTelemetryMsg convertToTelemetryProto(byte[] payload) throws InvalidProtocolBufferException, IllegalArgumentException {
... ... @@ -170,26 +174,20 @@ public class ProtoConverter {
170 174 return kvList;
171 175 }
172 176
173   - public static byte[] convertToRpcRequest(TransportProtos.ToDeviceRpcRequestMsg toDeviceRpcRequestMsg) {
174   - TransportProtos.ToDeviceRpcRequestMsg.Builder toDeviceRpcRequestMsgBuilder = toDeviceRpcRequestMsg.newBuilderForType();
175   - toDeviceRpcRequestMsgBuilder.mergeFrom(toDeviceRpcRequestMsg);
176   - toDeviceRpcRequestMsgBuilder.setParams(parseParams(toDeviceRpcRequestMsg));
177   - TransportProtos.ToDeviceRpcRequestMsg result = toDeviceRpcRequestMsgBuilder.build();
178   - return result.toByteArray();
179   - }
180   -
181   - private static String parseParams(TransportProtos.ToDeviceRpcRequestMsg toDeviceRpcRequestMsg) {
  177 + public static byte[] convertToRpcRequest(TransportProtos.ToDeviceRpcRequestMsg toDeviceRpcRequestMsg, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) throws AdaptorException {
  178 + rpcRequestDynamicMessageBuilder.clear();
  179 + JsonObject rpcRequestJson = new JsonObject();
  180 + rpcRequestJson.addProperty("method", toDeviceRpcRequestMsg.getMethodName());
  181 + rpcRequestJson.addProperty("requestId", toDeviceRpcRequestMsg.getRequestId());
182 182 String params = toDeviceRpcRequestMsg.getParams();
183   - JsonElement jsonElementParams = JSON_PARSER.parse(params);
184   - if (!jsonElementParams.isJsonPrimitive()) {
185   - return params;
186   - } else {
187   - JsonPrimitive primitiveParams = jsonElementParams.getAsJsonPrimitive();
188   - if (jsonElementParams.getAsJsonPrimitive().isString()) {
189   - return primitiveParams.getAsString();
190   - } else {
191   - return params;
192   - }
  183 + try {
  184 + JsonElement paramsElement = JSON_PARSER.parse(params);
  185 + rpcRequestJson.add("params", paramsElement);
  186 + JsonFormat.parser().ignoringUnknownFields().merge(GSON.toJson(rpcRequestJson), rpcRequestDynamicMessageBuilder);
  187 + DynamicMessage dynamicRpcRequest = rpcRequestDynamicMessageBuilder.build();
  188 + return dynamicRpcRequest.toByteArray();
  189 + } catch (Exception e) {
  190 + throw new AdaptorException("Failed to convert ToDeviceRpcRequestMsg to Dynamic Rpc request message due to: ", e);
193 191 }
194 192 }
195 193 }
... ...
... ... @@ -15,6 +15,8 @@
15 15 */
16 16 package org.thingsboard.server.dao.device;
17 17
  18 +import com.google.protobuf.Descriptors;
  19 +import com.google.protobuf.DynamicMessage;
18 20 import com.squareup.wire.Syntax;
19 21 import com.squareup.wire.schema.Field;
20 22 import com.squareup.wire.schema.Location;
... ... @@ -87,6 +89,8 @@ public class DeviceProfileServiceImpl extends AbstractEntityService implements D
87 89 private static final Location LOCATION = new Location("", "", -1, -1);
88 90 private static final String ATTRIBUTES_PROTO_SCHEMA = "attributes proto schema";
89 91 private static final String TELEMETRY_PROTO_SCHEMA = "telemetry proto schema";
  92 + private static final String RPC_REQUEST_PROTO_SCHEMA = "rpc request proto schema";
  93 + private static final String RPC_RESPONSE_PROTO_SCHEMA = "rpc response proto schema";
90 94
91 95 private static String invalidSchemaProvidedMessage(String schemaName) {
92 96 return "[Transport Configuration] invalid " + schemaName + " provided!";
... ... @@ -357,9 +361,10 @@ public class DeviceProfileServiceImpl extends AbstractEntityService implements D
357 361 if (transportConfiguration instanceof MqttDeviceProfileTransportConfiguration) {
358 362 MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
359 363 if (mqttTransportConfiguration.getTransportPayloadTypeConfiguration() instanceof ProtoTransportPayloadConfiguration) {
360   - ProtoTransportPayloadConfiguration protoTransportPayloadTypeConfiguration =
  364 + ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration =
361 365 (ProtoTransportPayloadConfiguration) mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
362   - validateProtoSchemas(protoTransportPayloadTypeConfiguration);
  366 + validateProtoSchemas(protoTransportPayloadConfiguration);
  367 + validateRpcRequestDynamicMessageFields(protoTransportPayloadConfiguration);
363 368 }
364 369 } else if (transportConfiguration instanceof CoapDeviceProfileTransportConfiguration) {
365 370 CoapDeviceProfileTransportConfiguration coapDeviceProfileTransportConfiguration = (CoapDeviceProfileTransportConfiguration) transportConfiguration;
... ... @@ -370,6 +375,7 @@ public class DeviceProfileServiceImpl extends AbstractEntityService implements D
370 375 if (transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration) {
371 376 ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
372 377 validateProtoSchemas(protoTransportPayloadConfiguration);
  378 + validateRpcRequestDynamicMessageFields(protoTransportPayloadConfiguration);
373 379 }
374 380 }
375 381 }
... ... @@ -417,6 +423,8 @@ public class DeviceProfileServiceImpl extends AbstractEntityService implements D
417 423 try {
418 424 validateTransportProtoSchema(protoTransportPayloadTypeConfiguration.getDeviceAttributesProtoSchema(), ATTRIBUTES_PROTO_SCHEMA);
419 425 validateTransportProtoSchema(protoTransportPayloadTypeConfiguration.getDeviceTelemetryProtoSchema(), TELEMETRY_PROTO_SCHEMA);
  426 + validateTransportProtoSchema(protoTransportPayloadTypeConfiguration.getDeviceRpcRequestProtoSchema(), RPC_REQUEST_PROTO_SCHEMA);
  427 + validateTransportProtoSchema(protoTransportPayloadTypeConfiguration.getDeviceRpcResponseProtoSchema(), RPC_RESPONSE_PROTO_SCHEMA);
420 428 } catch (Exception exception) {
421 429 throw new DataValidationException(exception.getMessage());
422 430 }
... ... @@ -539,6 +547,48 @@ public class DeviceProfileServiceImpl extends AbstractEntityService implements D
539 547
540 548 };
541 549
  550 + private void validateRpcRequestDynamicMessageFields(ProtoTransportPayloadConfiguration protoTransportPayloadTypeConfiguration) {
  551 + DynamicMessage.Builder rpcRequestDynamicMessageBuilder = protoTransportPayloadTypeConfiguration.getRpcRequestDynamicMessageBuilder(protoTransportPayloadTypeConfiguration.getDeviceRpcRequestProtoSchema());
  552 + Descriptors.Descriptor rpcRequestDynamicMessageDescriptor = rpcRequestDynamicMessageBuilder.getDescriptorForType();
  553 + if (rpcRequestDynamicMessageDescriptor == null) {
  554 + throw new DataValidationException(invalidSchemaProvidedMessage(RPC_REQUEST_PROTO_SCHEMA) + " Failed to get rpcRequestDynamicMessageDescriptor!");
  555 + } else {
  556 + if (CollectionUtils.isEmpty(rpcRequestDynamicMessageDescriptor.getFields()) || rpcRequestDynamicMessageDescriptor.getFields().size() != 3) {
  557 + throw new DataValidationException(invalidSchemaProvidedMessage(RPC_REQUEST_PROTO_SCHEMA) + " " + rpcRequestDynamicMessageDescriptor.getName() + " message should always contains 3 fields: method, requestId and params!");
  558 + }
  559 + Descriptors.FieldDescriptor methodFieldDescriptor = rpcRequestDynamicMessageDescriptor.findFieldByName("method");
  560 + if (methodFieldDescriptor == null) {
  561 + throw new DataValidationException(invalidSchemaProvidedMessage(RPC_REQUEST_PROTO_SCHEMA) + " Failed to get field descriptor for field: method!");
  562 + } else {
  563 + if (!Descriptors.FieldDescriptor.Type.STRING.equals(methodFieldDescriptor.getType())) {
  564 + throw new DataValidationException(invalidSchemaProvidedMessage(RPC_REQUEST_PROTO_SCHEMA) + " Field 'method' has invalid data type. Only string type is supported!");
  565 + }
  566 + if (methodFieldDescriptor.isRepeated()) {
  567 + throw new DataValidationException(invalidSchemaProvidedMessage(RPC_REQUEST_PROTO_SCHEMA) + " Field 'method' has invalid label!");
  568 + }
  569 + }
  570 + Descriptors.FieldDescriptor requestIdFieldDescriptor = rpcRequestDynamicMessageDescriptor.findFieldByName("requestId");
  571 + if (requestIdFieldDescriptor == null) {
  572 + throw new DataValidationException(invalidSchemaProvidedMessage(RPC_REQUEST_PROTO_SCHEMA) + " Failed to get field descriptor for field: requestId!");
  573 + } else {
  574 + if (!Descriptors.FieldDescriptor.Type.INT32.equals(requestIdFieldDescriptor.getType())) {
  575 + throw new DataValidationException(invalidSchemaProvidedMessage(RPC_REQUEST_PROTO_SCHEMA) + " Field 'requestId' has invalid data type. Only int32 type is supported!");
  576 + }
  577 + if (requestIdFieldDescriptor.isRepeated()) {
  578 + throw new DataValidationException(invalidSchemaProvidedMessage(RPC_REQUEST_PROTO_SCHEMA) + " Field 'requestId' has invalid label!");
  579 + }
  580 + }
  581 + Descriptors.FieldDescriptor paramsFieldDescriptor = rpcRequestDynamicMessageDescriptor.findFieldByName("params");
  582 + if (paramsFieldDescriptor == null) {
  583 + throw new DataValidationException(invalidSchemaProvidedMessage(RPC_REQUEST_PROTO_SCHEMA) + " Failed to get field descriptor for field: params!");
  584 + } else {
  585 + if (paramsFieldDescriptor.isRepeated()) {
  586 + throw new DataValidationException(invalidSchemaProvidedMessage(RPC_REQUEST_PROTO_SCHEMA) + "Field 'params' has invalid label!");
  587 + }
  588 + }
  589 + }
  590 + }
  591 +
542 592 private PaginatedRemover<TenantId, DeviceProfile> tenantDeviceProfilesRemover =
543 593 new PaginatedRemover<TenantId, DeviceProfile>() {
544 594
... ...
... ... @@ -61,6 +61,21 @@
61 61 {{ 'device-profile.attributes-proto-schema-required' | translate}}
62 62 </mat-error>
63 63 </mat-form-field>
  64 + <mat-form-field style="padding-bottom: 20px" fxFlex>
  65 + <mat-label translate>device-profile.rpc-request-proto-schema</mat-label>
  66 + <textarea matInput required formControlName="deviceRpcRequestProtoSchema" rows="5"></textarea>
  67 + <mat-error *ngIf="coapDeviceProfileTransportConfigurationFormGroup.get('coapDeviceTypeConfiguration.transportPayloadTypeConfiguration.deviceRpcRequestProtoSchema').hasError('required')">
  68 + {{ 'device-profile.rpc-request-proto-schema-required' | translate}}
  69 + </mat-error>
  70 + <mat-hint class="tb-hint" translate>device-profile.rpc-request-proto-schema-hint</mat-hint>
  71 + </mat-form-field>
  72 + <mat-form-field fxFlex>
  73 + <mat-label translate>device-profile.rpc-response-proto-schema</mat-label>
  74 + <textarea matInput required formControlName="deviceRpcResponseProtoSchema" rows="5"></textarea>
  75 + <mat-error *ngIf="coapDeviceProfileTransportConfigurationFormGroup.get('coapDeviceTypeConfiguration.transportPayloadTypeConfiguration.deviceRpcResponseProtoSchema').hasError('required')">
  76 + {{ 'device-profile.rpc-response-proto-schema-required' | translate}}
  77 + </mat-error>
  78 + </mat-form-field>
64 79 </div>
65 80 </div>
66 81 </fieldset>
... ...
... ... @@ -24,6 +24,8 @@ import {
24 24 coapDeviceTypeTranslationMap,
25 25 CoapTransportDeviceType,
26 26 defaultAttributesSchema,
  27 + defaultRpcRequestSchema,
  28 + defaultRpcResponseSchema,
27 29 defaultTelemetrySchema,
28 30 DeviceProfileTransportConfiguration,
29 31 DeviceTransportType,
... ... @@ -59,7 +61,9 @@ export class CoapDeviceProfileTransportConfigurationComponent implements Control
59 61 private transportPayloadTypeConfiguration = this.fb.group({
60 62 transportPayloadType: [TransportPayloadType.JSON, Validators.required],
61 63 deviceTelemetryProtoSchema: [defaultTelemetrySchema, Validators.required],
62   - deviceAttributesProtoSchema: [defaultAttributesSchema, Validators.required]
  64 + deviceAttributesProtoSchema: [defaultAttributesSchema, Validators.required],
  65 + deviceRpcRequestProtoSchema: [defaultRpcRequestSchema, Validators.required],
  66 + deviceRpcResponseProtoSchema: [defaultRpcResponseSchema, Validators.required]
63 67 });
64 68
65 69 get required(): boolean {
... ...
... ... @@ -88,6 +88,21 @@
88 88 {{ 'device-profile.attributes-proto-schema-required' | translate}}
89 89 </mat-error>
90 90 </mat-form-field>
  91 + <mat-form-field style="padding-bottom: 20px" fxFlex>
  92 + <mat-label translate>device-profile.rpc-request-proto-schema</mat-label>
  93 + <textarea matInput required formControlName="deviceRpcRequestProtoSchema" rows="5"></textarea>
  94 + <mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('transportPayloadTypeConfiguration.deviceRpcRequestProtoSchema').hasError('required')">
  95 + {{ 'device-profile.rpc-request-proto-schema-required' | translate}}
  96 + </mat-error>
  97 + <mat-hint class="tb-hint" translate>device-profile.rpc-request-proto-schema-hint</mat-hint>
  98 + </mat-form-field>
  99 + <mat-form-field fxFlex>
  100 + <mat-label translate>device-profile.rpc-response-proto-schema</mat-label>
  101 + <textarea matInput required formControlName="deviceRpcResponseProtoSchema" rows="5"></textarea>
  102 + <mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('transportPayloadTypeConfiguration.deviceRpcResponseProtoSchema').hasError('required')">
  103 + {{ 'device-profile.rpc-response-proto-schema-required' | translate}}
  104 + </mat-error>
  105 + </mat-form-field>
91 106 </div>
92 107 </div>
93 108 </fieldset>
... ...
... ... @@ -29,6 +29,8 @@ import { AppState } from '@app/core/core.state';
29 29 import { coerceBooleanProperty } from '@angular/cdk/coercion';
30 30 import {
31 31 defaultAttributesSchema,
  32 + defaultRpcRequestSchema,
  33 + defaultRpcResponseSchema,
32 34 defaultTelemetrySchema,
33 35 DeviceProfileTransportConfiguration,
34 36 DeviceTransportType,
... ... @@ -90,7 +92,9 @@ export class MqttDeviceProfileTransportConfigurationComponent implements Control
90 92 transportPayloadTypeConfiguration: this.fb.group({
91 93 transportPayloadType: [TransportPayloadType.JSON, Validators.required],
92 94 deviceTelemetryProtoSchema: [defaultTelemetrySchema, Validators.required],
93   - deviceAttributesProtoSchema: [defaultAttributesSchema, Validators.required]
  95 + deviceAttributesProtoSchema: [defaultAttributesSchema, Validators.required],
  96 + deviceRpcRequestProtoSchema: [defaultRpcRequestSchema, Validators.required],
  97 + deviceRpcResponseProtoSchema: [defaultRpcResponseSchema, Validators.required]
94 98 })
95 99 }, {validator: this.uniqueDeviceTopicValidator}
96 100 );
... ... @@ -139,15 +143,21 @@ export class MqttDeviceProfileTransportConfigurationComponent implements Control
139 143 if (forceUpdated) {
140 144 transportPayloadTypeForm.patchValue({
141 145 deviceTelemetryProtoSchema: defaultTelemetrySchema,
142   - deviceAttributesProtoSchema: defaultAttributesSchema
  146 + deviceAttributesProtoSchema: defaultAttributesSchema,
  147 + deviceRpcRequestProtoSchema: defaultRpcRequestSchema,
  148 + deviceRpcResponseProtoSchema: defaultRpcResponseSchema
143 149 }, {emitEvent: false});
144 150 }
145 151 if (type === TransportPayloadType.PROTOBUF && !this.disabled) {
146 152 transportPayloadTypeForm.get('deviceTelemetryProtoSchema').enable({emitEvent: false});
147 153 transportPayloadTypeForm.get('deviceAttributesProtoSchema').enable({emitEvent: false});
  154 + transportPayloadTypeForm.get('deviceRpcRequestProtoSchema').enable({emitEvent: false});
  155 + transportPayloadTypeForm.get('deviceRpcResponseProtoSchema').enable({emitEvent: false});
148 156 } else {
149 157 transportPayloadTypeForm.get('deviceTelemetryProtoSchema').disable({emitEvent: false});
150 158 transportPayloadTypeForm.get('deviceAttributesProtoSchema').disable({emitEvent: false});
  159 + transportPayloadTypeForm.get('deviceRpcRequestProtoSchema').enable({emitEvent: false});
  160 + transportPayloadTypeForm.get('deviceRpcResponseProtoSchema').disable({emitEvent: false});
151 161 }
152 162 }
153 163
... ...
... ... @@ -140,6 +140,24 @@ export const defaultAttributesSchema =
140 140 ' string serialNumber = 2;\n' +
141 141 '}';
142 142
  143 +export const defaultRpcRequestSchema =
  144 + 'syntax ="proto3";\n' +
  145 + 'package rpc;\n' +
  146 + '\n' +
  147 + 'message RpcRequestMsg {\n' +
  148 + ' string method = 1;\n' +
  149 + ' int32 requestId = 2;\n' +
  150 + ' string params = 3;\n' +
  151 + '}';
  152 +
  153 +export const defaultRpcResponseSchema =
  154 + 'syntax ="proto3";\n' +
  155 + 'package rpc;\n' +
  156 + '\n' +
  157 + 'message RpcResponseMsg {\n' +
  158 + ' string payload = 1;\n' +
  159 + '}';
  160 +
143 161 export const coapDeviceTypeTranslationMap = new Map<CoapTransportDeviceType, string>(
144 162 [
145 163 [CoapTransportDeviceType.DEFAULT, 'device-profile.coap-device-type-default'],
... ... @@ -170,6 +188,13 @@ export const deviceTransportTypeConfigurationInfoMap = new Map<DeviceTransportTy
170 188 hasProfileConfiguration: true,
171 189 hasDeviceConfiguration: false,
172 190 }
  191 + ],
  192 + [
  193 + DeviceTransportType.COAP,
  194 + {
  195 + hasProfileConfiguration: true,
  196 + hasDeviceConfiguration: false,
  197 + }
173 198 ]
174 199 ]
175 200 );
... ... @@ -270,8 +295,10 @@ export function createDeviceProfileTransportConfiguration(type: DeviceTransportT
270 295 break;
271 296 case DeviceTransportType.COAP:
272 297 const coapTransportConfiguration: CoapDeviceProfileTransportConfiguration = {
273   - coapDeviceTypeConfiguration: {coapDeviceType: CoapTransportDeviceType.DEFAULT,
274   - transportPayloadTypeConfiguration: {transportPayloadType: TransportPayloadType.JSON}}
  298 + coapDeviceTypeConfiguration: {
  299 + coapDeviceType: CoapTransportDeviceType.DEFAULT,
  300 + transportPayloadTypeConfiguration: {transportPayloadType: TransportPayloadType.JSON}
  301 + }
275 302 };
276 303 transportConfiguration = {...coapTransportConfiguration, type: DeviceTransportType.COAP};
277 304 break;
... ...
... ... @@ -1063,8 +1063,13 @@
1063 1063 "telemetry-proto-schema-required": "Telemetry proto schema is required.",
1064 1064 "attributes-proto-schema": "Attributes proto schema",
1065 1065 "attributes-proto-schema-required": "Attributes proto schema is required.",
  1066 + "rpc-response-proto-schema": "RPC response proto schema",
  1067 + "rpc-response-proto-schema-required": "RPC response proto schema is required.",
1066 1068 "rpc-response-topic-filter": "RPC response topic filter",
1067 1069 "rpc-response-topic-filter-required": "RPC response topic filter is required.",
  1070 + "rpc-request-proto-schema": "RPC request proto schema",
  1071 + "rpc-request-proto-schema-required": "RPC request proto schema is required.",
  1072 + "rpc-request-proto-schema-hint": "RPC request message should have always fields: string method = 1; int32 requestId = 2; and params = 3 of any data type.",
1068 1073 "not-valid-pattern-topic-filter": "Not valid pattern topic filter",
1069 1074 "not-valid-single-character": "Invalid use of a single-level wildcard character",
1070 1075 "not-valid-multi-character": "Invalid use of a multi-level wildcard character",
... ...