Commit a1e4bbbb31cca6360f77dd838d83872e646ed917

Authored by Viacheslav Klimov
1 parent 783b9595

Implement RPC for SNMP; refactor

@@ -40,7 +40,7 @@ public class SnmpDeviceTransportConfiguration implements DeviceTransportConfigur @@ -40,7 +40,7 @@ public class SnmpDeviceTransportConfiguration implements DeviceTransportConfigur
40 private String community; 40 private String community;
41 41
42 /* 42 /*
43 - * For SNMP v3 with User Based Security Model 43 + * For SNMP v3
44 * */ 44 * */
45 private String username; 45 private String username;
46 private String securityName; 46 private String securityName;
@@ -44,7 +44,7 @@ public class SnmpDeviceProfileTransportConfiguration implements DeviceProfileTra @@ -44,7 +44,7 @@ public class SnmpDeviceProfileTransportConfiguration implements DeviceProfileTra
44 @JsonIgnore 44 @JsonIgnore
45 private boolean isValid() { 45 private boolean isValid() {
46 return timeoutMs != null && timeoutMs >= 0 && retries != null && retries >= 0 46 return timeoutMs != null && timeoutMs >= 0 && retries != null && retries >= 0
47 - && communicationConfigs != null && !communicationConfigs.isEmpty() 47 + && communicationConfigs != null
48 && communicationConfigs.stream().allMatch(config -> config != null && config.isValid()) 48 && communicationConfigs.stream().allMatch(config -> config != null && config.isValid())
49 && communicationConfigs.stream().flatMap(config -> config.getAllMappings().stream()).map(SnmpMapping::getOid) 49 && communicationConfigs.stream().flatMap(config -> config.getAllMappings().stream()).map(SnmpMapping::getOid)
50 .distinct().count() == communicationConfigs.stream().mapToInt(config -> config.getAllMappings().size()).sum(); 50 .distinct().count() == communicationConfigs.stream().mapToInt(config -> config.getAllMappings().size()).sum();
@@ -21,6 +21,5 @@ public enum SnmpCommunicationSpec { @@ -21,6 +21,5 @@ public enum SnmpCommunicationSpec {
21 CLIENT_ATTRIBUTES_QUERYING, 21 CLIENT_ATTRIBUTES_QUERYING,
22 SHARED_ATTRIBUTES_SETTING, 22 SHARED_ATTRIBUTES_SETTING,
23 23
24 - TO_DEVICE_RPC_COMMAND_SETTING,  
25 - TO_DEVICE_RPC_RESPONSE_QUERYING 24 + TO_DEVICE_RPC_REQUEST,
26 } 25 }
1 -/**  
2 - * Copyright © 2016-2021 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.common.data.transport.snmp.config.impl;  
17 -  
18 -import lombok.Data;  
19 -import org.thingsboard.server.common.data.kv.DataType;  
20 -import org.thingsboard.server.common.data.transport.snmp.SnmpCommunicationSpec;  
21 -import org.thingsboard.server.common.data.transport.snmp.SnmpMapping;  
22 -import org.thingsboard.server.common.data.transport.snmp.SnmpMethod;  
23 -import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicationConfig;  
24 -  
25 -import java.util.Arrays;  
26 -import java.util.Collections;  
27 -import java.util.List;  
28 -  
29 -@Data  
30 -public class ToDeviceRpcCommandSettingSnmpCommunicationConfig implements SnmpCommunicationConfig {  
31 - private SnmpMapping mapping;  
32 -  
33 - @Override  
34 - public SnmpCommunicationSpec getSpec() {  
35 - return SnmpCommunicationSpec.TO_DEVICE_RPC_COMMAND_SETTING;  
36 - }  
37 -  
38 - @Override  
39 - public SnmpMethod getMethod() {  
40 - return SnmpMethod.SET;  
41 - }  
42 -  
43 - public void setMapping(SnmpMapping mapping) {  
44 - this.mapping = mapping != null ? new SnmpMapping(mapping.getOid(), RPC_COMMAND_KEY_NAME, DataType.STRING) : null;  
45 - }  
46 -  
47 - @Override  
48 - public List<SnmpMapping> getAllMappings() {  
49 - return Collections.singletonList(mapping);  
50 - }  
51 -  
52 - @Override  
53 - public boolean isValid() {  
54 - return mapping != null && mapping.isValid();  
55 - }  
56 -  
57 - public static final String RPC_COMMAND_KEY_NAME = "rpcCommand";  
58 -  
59 -}  
1 -/**  
2 - * Copyright © 2016-2021 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.common.data.transport.snmp.config.impl;  
17 -  
18 -import lombok.Data;  
19 -import lombok.EqualsAndHashCode;  
20 -import org.thingsboard.server.common.data.kv.DataType;  
21 -import org.thingsboard.server.common.data.transport.snmp.SnmpCommunicationSpec;  
22 -import org.thingsboard.server.common.data.transport.snmp.SnmpMapping;  
23 -import org.thingsboard.server.common.data.transport.snmp.SnmpMethod;  
24 -import org.thingsboard.server.common.data.transport.snmp.config.RepeatingQueryingSnmpCommunicationConfig;  
25 -  
26 -import java.util.Collections;  
27 -import java.util.List;  
28 -  
29 -@EqualsAndHashCode(callSuper = true)  
30 -@Data  
31 -public class ToDeviceRpcResponseQueryingSnmpCommunicationConfig extends RepeatingQueryingSnmpCommunicationConfig {  
32 - private SnmpMapping mapping;  
33 -  
34 - @Override  
35 - public SnmpCommunicationSpec getSpec() {  
36 - return SnmpCommunicationSpec.TO_DEVICE_RPC_RESPONSE_QUERYING;  
37 - }  
38 -  
39 - @Override  
40 - public SnmpMethod getMethod() {  
41 - return SnmpMethod.GET;  
42 - }  
43 -  
44 - public void setMapping(SnmpMapping mapping) {  
45 - this.mapping = mapping != null ? new SnmpMapping(mapping.getOid(), RPC_RESPONSE_KEY_NAME, DataType.STRING) : null;  
46 - }  
47 -  
48 - @Override  
49 - public List<SnmpMapping> getAllMappings() {  
50 - return Collections.singletonList(mapping);  
51 - }  
52 -  
53 - @Override  
54 - public boolean isValid() {  
55 - return true;  
56 - }  
57 -  
58 - public static final String RPC_RESPONSE_KEY_NAME = "rpcResponse";  
59 -  
60 -}  
@@ -152,6 +152,7 @@ public class SnmpTransportContext extends TransportContext { @@ -152,6 +152,7 @@ public class SnmpTransportContext extends TransportContext {
152 } 152 }
153 } catch (Exception e) { 153 } catch (Exception e) {
154 log.error("Failed to update session for SNMP device {}: {}", sessionContext.getDeviceId(), e.getMessage()); 154 log.error("Failed to update session for SNMP device {}: {}", sessionContext.getDeviceId(), e.getMessage());
  155 + destroyDeviceSession(sessionContext);
155 } 156 }
156 } 157 }
157 158
common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/PduService.java renamed from common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/PduMapper.java
@@ -29,6 +29,7 @@ import org.springframework.stereotype.Service; @@ -29,6 +29,7 @@ import org.springframework.stereotype.Service;
29 import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration; 29 import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration;
30 import org.thingsboard.server.common.data.kv.DataType; 30 import org.thingsboard.server.common.data.kv.DataType;
31 import org.thingsboard.server.common.data.transport.snmp.SnmpMapping; 31 import org.thingsboard.server.common.data.transport.snmp.SnmpMapping;
  32 +import org.thingsboard.server.common.data.transport.snmp.SnmpMethod;
32 import org.thingsboard.server.common.data.transport.snmp.SnmpProtocolVersion; 33 import org.thingsboard.server.common.data.transport.snmp.SnmpProtocolVersion;
33 import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicationConfig; 34 import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicationConfig;
34 import org.thingsboard.server.queue.util.TbSnmpTransportComponent; 35 import org.thingsboard.server.queue.util.TbSnmpTransportComponent;
@@ -45,32 +46,16 @@ import java.util.stream.IntStream; @@ -45,32 +46,16 @@ import java.util.stream.IntStream;
45 @TbSnmpTransportComponent 46 @TbSnmpTransportComponent
46 @Service 47 @Service
47 @Slf4j 48 @Slf4j
48 -public class PduMapper { 49 +public class PduService {
49 public PDU createPdu(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map<String, String> values) { 50 public PDU createPdu(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map<String, String> values) {
50 - PDU pdu;  
51 - SnmpDeviceTransportConfiguration deviceTransportConfiguration = sessionContext.getDeviceTransportConfiguration();  
52 - SnmpProtocolVersion snmpVersion = deviceTransportConfiguration.getProtocolVersion();  
53 - switch (snmpVersion) {  
54 - case V1:  
55 - case V2C:  
56 - pdu = new PDU();  
57 - break;  
58 - case V3:  
59 - ScopedPDU scopedPdu = new ScopedPDU();  
60 - scopedPdu.setContextName(new OctetString(deviceTransportConfiguration.getContextName()));  
61 - scopedPdu.setContextEngineID(new OctetString(deviceTransportConfiguration.getEngineId()));  
62 - pdu = scopedPdu;  
63 - break;  
64 - default:  
65 - throw new UnsupportedOperationException("SNMP version " + snmpVersion + " is not supported");  
66 - } 51 + PDU pdu = setUpPdu(sessionContext);
67 52
68 pdu.setType(communicationConfig.getMethod().getCode()); 53 pdu.setType(communicationConfig.getMethod().getCode());
69 pdu.addAll(communicationConfig.getAllMappings().stream() 54 pdu.addAll(communicationConfig.getAllMappings().stream()
70 .filter(mapping -> values.isEmpty() || values.containsKey(mapping.getKey())) 55 .filter(mapping -> values.isEmpty() || values.containsKey(mapping.getKey()))
71 .map(mapping -> Optional.ofNullable(values.get(mapping.getKey())) 56 .map(mapping -> Optional.ofNullable(values.get(mapping.getKey()))
72 .map(value -> { 57 .map(value -> {
73 - Variable variable = toSnmpVariable(mapping, value); 58 + Variable variable = toSnmpVariable(value, mapping.getDataType());
74 return new VariableBinding(new OID(mapping.getOid()), variable); 59 return new VariableBinding(new OID(mapping.getOid()), variable);
75 }) 60 })
76 .orElseGet(() -> new VariableBinding(new OID(mapping.getOid())))) 61 .orElseGet(() -> new VariableBinding(new OID(mapping.getOid()))))
@@ -79,9 +64,20 @@ public class PduMapper { @@ -79,9 +64,20 @@ public class PduMapper {
79 return pdu; 64 return pdu;
80 } 65 }
81 66
82 - private Variable toSnmpVariable(SnmpMapping mapping, String value) { 67 + public PDU createSingleVariablePdu(DeviceSessionContext sessionContext, SnmpMethod snmpMethod, String oid, String value, DataType dataType) {
  68 + PDU pdu = setUpPdu(sessionContext);
  69 + pdu.setType(snmpMethod.getCode());
  70 +
  71 + Variable variable = value == null ? Null.instance : toSnmpVariable(value, dataType);
  72 + pdu.add(new VariableBinding(new OID(oid), variable));
  73 +
  74 + return pdu;
  75 + }
  76 +
  77 + private Variable toSnmpVariable(String value, DataType dataType) {
  78 + dataType = dataType == null ? DataType.STRING : dataType;
83 Variable variable; 79 Variable variable;
84 - switch (mapping.getDataType()) { 80 + switch (dataType) {
85 case LONG: 81 case LONG:
86 try { 82 try {
87 variable = new Integer32(Integer.parseInt(value)); 83 variable = new Integer32(Integer.parseInt(value));
@@ -98,37 +94,62 @@ public class PduMapper { @@ -98,37 +94,62 @@ public class PduMapper {
98 return variable; 94 return variable;
99 } 95 }
100 96
  97 + private PDU setUpPdu(DeviceSessionContext sessionContext) {
  98 + PDU pdu;
  99 + SnmpDeviceTransportConfiguration deviceTransportConfiguration = sessionContext.getDeviceTransportConfiguration();
  100 + SnmpProtocolVersion snmpVersion = deviceTransportConfiguration.getProtocolVersion();
  101 + switch (snmpVersion) {
  102 + case V1:
  103 + case V2C:
  104 + pdu = new PDU();
  105 + break;
  106 + case V3:
  107 + ScopedPDU scopedPdu = new ScopedPDU();
  108 + scopedPdu.setContextName(new OctetString(deviceTransportConfiguration.getContextName()));
  109 + scopedPdu.setContextEngineID(new OctetString(deviceTransportConfiguration.getEngineId()));
  110 + pdu = scopedPdu;
  111 + break;
  112 + default:
  113 + throw new UnsupportedOperationException("SNMP version " + snmpVersion + " is not supported");
  114 + }
  115 + return pdu;
  116 + }
101 117
102 - public JsonObject processPdu(PDU pdu, DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig) {  
103 - List<VariableBinding> variablesBindings = IntStream.range(0, pdu.size())  
104 - .mapToObj(pdu::get)  
105 - .filter(Objects::nonNull)  
106 - .filter(variableBinding -> !(variableBinding.getVariable() instanceof Null))  
107 - .collect(Collectors.toList());  
108 - JsonObject data = new JsonObject(); 118 +
  119 + public JsonObject processPdu(PDU pdu, List<SnmpMapping> responseMappings) {
  120 + Map<OID, String> values = processPdu(pdu);
109 121
110 Map<OID, SnmpMapping> mappings = new HashMap<>(); 122 Map<OID, SnmpMapping> mappings = new HashMap<>();
111 - for (SnmpMapping mapping : communicationConfig.getAllMappings()) {  
112 - OID oid = new OID(mapping.getOid());  
113 - mappings.put(oid, mapping); 123 + if (responseMappings != null) {
  124 + for (SnmpMapping mapping : responseMappings) {
  125 + OID oid = new OID(mapping.getOid());
  126 + mappings.put(oid, mapping);
  127 + }
114 } 128 }
115 129
116 - variablesBindings.forEach(variableBinding -> {  
117 - log.trace("Processing variable binding: {}", variableBinding); 130 + JsonObject data = new JsonObject();
  131 + values.forEach((oid, value) -> {
  132 + log.trace("Processing variable binding: {} - {}", oid, value);
118 133
119 - OID oid = variableBinding.getOid();  
120 SnmpMapping mapping = mappings.get(oid); 134 SnmpMapping mapping = mappings.get(oid);
121 if (mapping == null) { 135 if (mapping == null) {
122 log.debug("No SNMP mapping for oid {}", oid); 136 log.debug("No SNMP mapping for oid {}", oid);
123 return; 137 return;
124 } 138 }
125 139
126 - processValue(mapping.getKey(), mapping.getDataType(), variableBinding.toValueString(), data); 140 + processValue(mapping.getKey(), mapping.getDataType(), value, data);
127 }); 141 });
128 142
129 return data; 143 return data;
130 } 144 }
131 145
  146 + public Map<OID, String> processPdu(PDU pdu) {
  147 + return IntStream.range(0, pdu.size())
  148 + .mapToObj(pdu::get)
  149 + .filter(Objects::nonNull)
  150 + .filter(variableBinding -> !(variableBinding.getVariable() instanceof Null))
  151 + .collect(Collectors.toMap(VariableBinding::getOid, VariableBinding::toValueString));
  152 + }
132 153
133 private void processValue(String key, DataType dataType, String value, JsonObject result) { 154 private void processValue(String key, DataType dataType, String value, JsonObject result) {
134 switch (dataType) { 155 switch (dataType) {
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.transport.snmp.service; 16 package org.thingsboard.server.transport.snmp.service;
17 17
  18 +import com.google.gson.JsonElement;
18 import com.google.gson.JsonObject; 19 import com.google.gson.JsonObject;
19 import lombok.Data; 20 import lombok.Data;
20 import lombok.Getter; 21 import lombok.Getter;
@@ -35,11 +36,12 @@ import org.springframework.beans.factory.annotation.Value; @@ -35,11 +36,12 @@ import org.springframework.beans.factory.annotation.Value;
35 import org.springframework.stereotype.Service; 36 import org.springframework.stereotype.Service;
36 import org.thingsboard.common.util.ThingsBoardThreadFactory; 37 import org.thingsboard.common.util.ThingsBoardThreadFactory;
37 import org.thingsboard.server.common.data.TbTransportService; 38 import org.thingsboard.server.common.data.TbTransportService;
38 -import org.thingsboard.server.common.data.id.DeviceProfileId; 39 +import org.thingsboard.server.common.data.kv.DataType;
39 import org.thingsboard.server.common.data.transport.snmp.SnmpCommunicationSpec; 40 import org.thingsboard.server.common.data.transport.snmp.SnmpCommunicationSpec;
  41 +import org.thingsboard.server.common.data.transport.snmp.SnmpMapping;
  42 +import org.thingsboard.server.common.data.transport.snmp.SnmpMethod;
40 import org.thingsboard.server.common.data.transport.snmp.config.RepeatingQueryingSnmpCommunicationConfig; 43 import org.thingsboard.server.common.data.transport.snmp.config.RepeatingQueryingSnmpCommunicationConfig;
41 import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicationConfig; 44 import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicationConfig;
42 -import org.thingsboard.server.common.data.transport.snmp.config.impl.ToDeviceRpcResponseQueryingSnmpCommunicationConfig;  
43 import org.thingsboard.server.common.transport.TransportService; 45 import org.thingsboard.server.common.transport.TransportService;
44 import org.thingsboard.server.common.transport.TransportServiceCallback; 46 import org.thingsboard.server.common.transport.TransportServiceCallback;
45 import org.thingsboard.server.common.transport.adaptor.JsonConverter; 47 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
@@ -50,10 +52,12 @@ import org.thingsboard.server.transport.snmp.session.DeviceSessionContext; @@ -50,10 +52,12 @@ import org.thingsboard.server.transport.snmp.session.DeviceSessionContext;
50 import javax.annotation.PostConstruct; 52 import javax.annotation.PostConstruct;
51 import javax.annotation.PreDestroy; 53 import javax.annotation.PreDestroy;
52 import java.io.IOException; 54 import java.io.IOException;
  55 +import java.util.Arrays;
53 import java.util.Collections; 56 import java.util.Collections;
54 import java.util.EnumMap; 57 import java.util.EnumMap;
55 import java.util.List; 58 import java.util.List;
56 import java.util.Map; 59 import java.util.Map;
  60 +import java.util.Optional;
57 import java.util.concurrent.ExecutorService; 61 import java.util.concurrent.ExecutorService;
58 import java.util.concurrent.Executors; 62 import java.util.concurrent.Executors;
59 import java.util.concurrent.ScheduledExecutorService; 63 import java.util.concurrent.ScheduledExecutorService;
@@ -67,13 +71,14 @@ import java.util.stream.Collectors; @@ -67,13 +71,14 @@ import java.util.stream.Collectors;
67 @RequiredArgsConstructor 71 @RequiredArgsConstructor
68 public class SnmpTransportService implements TbTransportService { 72 public class SnmpTransportService implements TbTransportService {
69 private final TransportService transportService; 73 private final TransportService transportService;
70 - private final PduMapper pduMapper; 74 + private final PduService pduService;
71 75
72 @Getter 76 @Getter
73 private Snmp snmp; 77 private Snmp snmp;
74 private ScheduledExecutorService queryingExecutor; 78 private ScheduledExecutorService queryingExecutor;
75 private ExecutorService responseProcessingExecutor; 79 private ExecutorService responseProcessingExecutor;
76 80
  81 + private final Map<SnmpCommunicationSpec, ResponseDataMapper> responseDataMappers = new EnumMap<>(SnmpCommunicationSpec.class);
77 private final Map<SnmpCommunicationSpec, ResponseProcessor> responseProcessors = new EnumMap<>(SnmpCommunicationSpec.class); 82 private final Map<SnmpCommunicationSpec, ResponseProcessor> responseProcessors = new EnumMap<>(SnmpCommunicationSpec.class);
78 83
79 @Value("${transport.snmp.response_processing.parallelism_level}") 84 @Value("${transport.snmp.response_processing.parallelism_level}")
@@ -87,6 +92,7 @@ public class SnmpTransportService implements TbTransportService { @@ -87,6 +92,7 @@ public class SnmpTransportService implements TbTransportService {
87 responseProcessingExecutor = Executors.newWorkStealingPool(responseProcessingParallelismLevel); 92 responseProcessingExecutor = Executors.newWorkStealingPool(responseProcessingParallelismLevel);
88 93
89 initializeSnmp(); 94 initializeSnmp();
  95 + configureResponseDataMappers();
90 configureResponseProcessors(); 96 configureResponseProcessors();
91 97
92 log.info("SNMP transport service initialized"); 98 log.info("SNMP transport service initialized");
@@ -138,16 +144,19 @@ public class SnmpTransportService implements TbTransportService { @@ -138,16 +144,19 @@ public class SnmpTransportService implements TbTransportService {
138 } 144 }
139 145
140 146
141 - public void sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig) { 147 + private void sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig) {
142 sendRequest(sessionContext, communicationConfig, Collections.emptyMap()); 148 sendRequest(sessionContext, communicationConfig, Collections.emptyMap());
143 } 149 }
144 150
145 - public void sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map<String, String> values) {  
146 - PDU request = pduMapper.createPdu(sessionContext, communicationConfig, values); 151 + private void sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map<String, String> values) {
  152 + PDU request = pduService.createPdu(sessionContext, communicationConfig, values);
  153 + RequestInfo requestInfo = new RequestInfo(communicationConfig.getSpec(), communicationConfig.getAllMappings());
  154 + sendRequest(sessionContext, request, requestInfo);
  155 + }
147 156
  157 + private void sendRequest(DeviceSessionContext sessionContext, PDU request, RequestInfo requestInfo) {
148 if (request.size() > 0) { 158 if (request.size() > 0) {
149 log.trace("Executing SNMP request for device {}. Variables bindings: {}", sessionContext.getDeviceId(), request.getVariableBindings()); 159 log.trace("Executing SNMP request for device {}. Variables bindings: {}", sessionContext.getDeviceId(), request.getVariableBindings());
150 - RequestInfo requestInfo = new RequestInfo(sessionContext.getDeviceProfile().getId(), communicationConfig);  
151 try { 160 try {
152 snmp.send(request, sessionContext.getTarget(), requestInfo, sessionContext); 161 snmp.send(request, sessionContext.getTarget(), requestInfo, sessionContext);
153 } catch (IOException e) { 162 } catch (IOException e) {
@@ -156,6 +165,39 @@ public class SnmpTransportService implements TbTransportService { @@ -156,6 +165,39 @@ public class SnmpTransportService implements TbTransportService {
156 } 165 }
157 } 166 }
158 167
  168 + public void onAttributeUpdate(DeviceSessionContext sessionContext, TransportProtos.AttributeUpdateNotificationMsg attributeUpdateNotification) {
  169 + sessionContext.getProfileTransportConfiguration().getCommunicationConfigs().stream()
  170 + .filter(config -> config.getSpec() == SnmpCommunicationSpec.SHARED_ATTRIBUTES_SETTING)
  171 + .findFirst()
  172 + .ifPresent(communicationConfig -> {
  173 + Map<String, String> sharedAttributes = JsonConverter.toJson(attributeUpdateNotification).entrySet().stream()
  174 + .collect(Collectors.toMap(
  175 + Map.Entry::getKey,
  176 + entry -> entry.getValue().isJsonPrimitive() ? entry.getValue().getAsString() : entry.getValue().toString()
  177 + ));
  178 + sendRequest(sessionContext, communicationConfig, sharedAttributes);
  179 + });
  180 + }
  181 +
  182 + public void onToDeviceRpcRequest(DeviceSessionContext sessionContext, TransportProtos.ToDeviceRpcRequestMsg toDeviceRpcRequestMsg) {
  183 + SnmpMethod snmpMethod = SnmpMethod.valueOf(toDeviceRpcRequestMsg.getMethodName());
  184 + JsonObject params = JsonConverter.parse(toDeviceRpcRequestMsg.getParams()).getAsJsonObject();
  185 +
  186 + String oid = Optional.ofNullable(params.get("oid")).map(JsonElement::getAsString).orElse(null);
  187 + String value = Optional.ofNullable(params.get("value")).map(JsonElement::getAsString).orElse(null);
  188 + DataType dataType = Optional.ofNullable(params.get("dataType")).map(e -> DataType.valueOf(e.getAsString())).orElse(DataType.STRING);
  189 +
  190 + if (oid == null || oid.isEmpty()) {
  191 + throw new IllegalArgumentException("OID in to-device RPC request is not specified");
  192 + }
  193 + if (value == null && snmpMethod == SnmpMethod.SET) {
  194 + throw new IllegalArgumentException("Value must be specified for SNMP method 'SET'");
  195 + }
  196 +
  197 + PDU request = pduService.createSingleVariablePdu(sessionContext, snmpMethod, oid, value, dataType);
  198 + sendRequest(sessionContext, request, new RequestInfo(toDeviceRpcRequestMsg.getRequestId(), SnmpCommunicationSpec.TO_DEVICE_RPC_REQUEST));
  199 + }
  200 +
159 201
160 public void processResponseEvent(DeviceSessionContext sessionContext, ResponseEvent event) { 202 public void processResponseEvent(DeviceSessionContext sessionContext, ResponseEvent event) {
161 ((Snmp) event.getSource()).cancel(event.getRequest(), sessionContext); 203 ((Snmp) event.getSource()).cancel(event.getRequest(), sessionContext);
@@ -178,47 +220,59 @@ public class SnmpTransportService implements TbTransportService { @@ -178,47 +220,59 @@ public class SnmpTransportService implements TbTransportService {
178 } 220 }
179 221
180 private void processResponse(DeviceSessionContext sessionContext, PDU response, RequestInfo requestInfo) { 222 private void processResponse(DeviceSessionContext sessionContext, PDU response, RequestInfo requestInfo) {
181 - ResponseProcessor responseProcessor = responseProcessors.get(requestInfo.getCommunicationConfig().getSpec());  
182 - if (responseProcessor == null) {  
183 - return;  
184 - } 223 + ResponseProcessor responseProcessor = responseProcessors.get(requestInfo.getCommunicationSpec());
  224 + if (responseProcessor == null) return;
  225 +
  226 + JsonObject responseData = responseDataMappers.get(requestInfo.getCommunicationSpec()).map(response, requestInfo);
185 227
186 - JsonObject responseData = pduMapper.processPdu(response, sessionContext, requestInfo.getCommunicationConfig());  
187 if (responseData.entrySet().isEmpty()) { 228 if (responseData.entrySet().isEmpty()) {
188 log.debug("No values is the SNMP response for device {}. Request id: {}", sessionContext.getDeviceId(), response.getRequestID()); 229 log.debug("No values is the SNMP response for device {}. Request id: {}", sessionContext.getDeviceId(), response.getRequestID());
189 return; 230 return;
190 } 231 }
191 232
192 - responseProcessor.process(responseData, sessionContext); 233 + responseProcessor.process(responseData, requestInfo, sessionContext);
193 reportActivity(sessionContext.getSessionInfo()); 234 reportActivity(sessionContext.getSessionInfo());
194 } 235 }
195 236
  237 + private void configureResponseDataMappers() {
  238 + responseDataMappers.put(SnmpCommunicationSpec.TO_DEVICE_RPC_REQUEST, (pdu, requestInfo) -> {
  239 + JsonObject responseData = new JsonObject();
  240 + pduService.processPdu(pdu).forEach((oid, value) -> {
  241 + responseData.addProperty(oid.toDottedString(), value);
  242 + });
  243 + return responseData;
  244 + });
  245 +
  246 + ResponseDataMapper defaultResponseDataMapper = (pdu, requestInfo) -> {
  247 + return pduService.processPdu(pdu, requestInfo.getResponseMappings());
  248 + };
  249 + Arrays.stream(SnmpCommunicationSpec.values())
  250 + .forEach(communicationSpec -> {
  251 + responseDataMappers.putIfAbsent(communicationSpec, defaultResponseDataMapper);
  252 + });
  253 + }
  254 +
196 private void configureResponseProcessors() { 255 private void configureResponseProcessors() {
197 - responseProcessors.put(SnmpCommunicationSpec.TELEMETRY_QUERYING, (response, sessionContext) -> {  
198 - TransportProtos.PostTelemetryMsg postTelemetryMsg = JsonConverter.convertToTelemetryProto(response); 256 + responseProcessors.put(SnmpCommunicationSpec.TELEMETRY_QUERYING, (responseData, requestInfo, sessionContext) -> {
  257 + TransportProtos.PostTelemetryMsg postTelemetryMsg = JsonConverter.convertToTelemetryProto(responseData);
199 transportService.process(sessionContext.getSessionInfo(), postTelemetryMsg, null); 258 transportService.process(sessionContext.getSessionInfo(), postTelemetryMsg, null);
200 - log.debug("Posted telemetry for SNMP device {}: {}", sessionContext.getDeviceId(), response); 259 + log.debug("Posted telemetry for SNMP device {}: {}", sessionContext.getDeviceId(), responseData);
201 }); 260 });
202 261
203 - responseProcessors.put(SnmpCommunicationSpec.CLIENT_ATTRIBUTES_QUERYING, (response, sessionContext) -> {  
204 - TransportProtos.PostAttributeMsg postAttributesMsg = JsonConverter.convertToAttributesProto(response); 262 + responseProcessors.put(SnmpCommunicationSpec.CLIENT_ATTRIBUTES_QUERYING, (responseData, requestInfo, sessionContext) -> {
  263 + TransportProtos.PostAttributeMsg postAttributesMsg = JsonConverter.convertToAttributesProto(responseData);
205 transportService.process(sessionContext.getSessionInfo(), postAttributesMsg, null); 264 transportService.process(sessionContext.getSessionInfo(), postAttributesMsg, null);
206 - log.debug("Posted attributes for SNMP device {}: {}", sessionContext.getDeviceId(), response); 265 + log.debug("Posted attributes for SNMP device {}: {}", sessionContext.getDeviceId(), responseData);
207 }); 266 });
208 267
209 - responseProcessors.put(SnmpCommunicationSpec.TO_DEVICE_RPC_RESPONSE_QUERYING, (response, sessionContext) -> {  
210 - String rpcResponse = response.get(ToDeviceRpcResponseQueryingSnmpCommunicationConfig.RPC_RESPONSE_KEY_NAME).getAsString(); 268 + responseProcessors.put(SnmpCommunicationSpec.TO_DEVICE_RPC_REQUEST, (responseData, requestInfo, sessionContext) -> {
211 TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder() 269 TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
212 - .setPayload(rpcResponse) 270 + .setRequestId(requestInfo.getRequestId())
  271 + .setPayload(JsonConverter.toJson(responseData))
213 .build(); 272 .build();
214 transportService.process(sessionContext.getSessionInfo(), rpcResponseMsg, null); 273 transportService.process(sessionContext.getSessionInfo(), rpcResponseMsg, null);
215 - log.debug("Processed RPC response from device {}: {}", sessionContext.getDeviceId(), rpcResponse); 274 + log.debug("Posted RPC response {} for device {}", responseData, sessionContext.getDeviceId());
216 }); 275 });
217 -  
218 -// responseProcessors.put(, (response, sessionContext) -> {  
219 -// TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(sessionContext.getDeviceId(), response);  
220 -// transportService.process(sessionContext.getSessionInfo(), claimDeviceMsg, null);  
221 -// });  
222 } 276 }
223 277
224 private void reportActivity(TransportProtos.SessionInfoProto sessionInfo) { 278 private void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
@@ -256,12 +310,31 @@ public class SnmpTransportService implements TbTransportService { @@ -256,12 +310,31 @@ public class SnmpTransportService implements TbTransportService {
256 310
257 @Data 311 @Data
258 private static class RequestInfo { 312 private static class RequestInfo {
259 - private final DeviceProfileId deviceProfileId;  
260 - private final SnmpCommunicationConfig communicationConfig; 313 + private Integer requestId;
  314 + private SnmpCommunicationSpec communicationSpec;
  315 + private List<SnmpMapping> responseMappings;
  316 +
  317 + public RequestInfo(Integer requestId, SnmpCommunicationSpec communicationSpec) {
  318 + this.requestId = requestId;
  319 + this.communicationSpec = communicationSpec;
  320 + }
  321 +
  322 + public RequestInfo(SnmpCommunicationSpec communicationSpec) {
  323 + this.communicationSpec = communicationSpec;
  324 + }
  325 +
  326 + public RequestInfo(SnmpCommunicationSpec communicationSpec, List<SnmpMapping> responseMappings) {
  327 + this.communicationSpec = communicationSpec;
  328 + this.responseMappings = responseMappings;
  329 + }
  330 + }
  331 +
  332 + private interface ResponseDataMapper {
  333 + JsonObject map(PDU pdu, RequestInfo requestInfo);
261 } 334 }
262 335
263 private interface ResponseProcessor { 336 private interface ResponseProcessor {
264 - void process(JsonObject responseData, DeviceSessionContext sessionContext); 337 + void process(JsonObject responseData, RequestInfo requestInfo, DeviceSessionContext sessionContext);
265 } 338 }
266 339
267 } 340 }
@@ -26,11 +26,7 @@ import org.thingsboard.server.common.data.DeviceProfile; @@ -26,11 +26,7 @@ import org.thingsboard.server.common.data.DeviceProfile;
26 import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration; 26 import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration;
27 import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileTransportConfiguration; 27 import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileTransportConfiguration;
28 import org.thingsboard.server.common.data.id.DeviceId; 28 import org.thingsboard.server.common.data.id.DeviceId;
29 -import org.thingsboard.server.common.data.transport.snmp.SnmpCommunicationSpec;  
30 -import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicationConfig;  
31 -import org.thingsboard.server.common.data.transport.snmp.config.impl.ToDeviceRpcCommandSettingSnmpCommunicationConfig;  
32 import org.thingsboard.server.common.transport.SessionMsgListener; 29 import org.thingsboard.server.common.transport.SessionMsgListener;
33 -import org.thingsboard.server.common.transport.adaptor.JsonConverter;  
34 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; 30 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
35 import org.thingsboard.server.gen.transport.TransportProtos; 31 import org.thingsboard.server.gen.transport.TransportProtos;
36 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; 32 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
@@ -40,15 +36,11 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMs @@ -40,15 +36,11 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMs
40 import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg; 36 import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
41 import org.thingsboard.server.transport.snmp.SnmpTransportContext; 37 import org.thingsboard.server.transport.snmp.SnmpTransportContext;
42 38
43 -import java.io.IOException;  
44 import java.util.LinkedList; 39 import java.util.LinkedList;
45 import java.util.List; 40 import java.util.List;
46 -import java.util.Map;  
47 -import java.util.Optional;  
48 import java.util.UUID; 41 import java.util.UUID;
49 import java.util.concurrent.ScheduledFuture; 42 import java.util.concurrent.ScheduledFuture;
50 import java.util.concurrent.atomic.AtomicInteger; 43 import java.util.concurrent.atomic.AtomicInteger;
51 -import java.util.stream.Collectors;  
52 44
53 @Slf4j 45 @Slf4j
54 public class DeviceSessionContext extends DeviceAwareSessionContext implements SessionMsgListener, ResponseListener { 46 public class DeviceSessionContext extends DeviceAwareSessionContext implements SessionMsgListener, ResponseListener {
@@ -136,15 +128,7 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S @@ -136,15 +128,7 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
136 128
137 @Override 129 @Override
138 public void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification) { 130 public void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification) {
139 - getCommunicationConfigForSpec(SnmpCommunicationSpec.SHARED_ATTRIBUTES_SETTING)  
140 - .ifPresent(communicationConfig -> {  
141 - Map<String, String> sharedAttributes = JsonConverter.toJson(attributeUpdateNotification).entrySet().stream()  
142 - .collect(Collectors.toMap(  
143 - Map.Entry::getKey,  
144 - entry -> entry.getValue().isJsonPrimitive() ? entry.getValue().getAsString() : entry.getValue().toString()  
145 - ));  
146 - snmpTransportContext.getSnmpTransportService().sendRequest(this, communicationConfig, sharedAttributes);  
147 - }); 131 + snmpTransportContext.getSnmpTransportService().onAttributeUpdate(this, attributeUpdateNotification);
148 } 132 }
149 133
150 @Override 134 @Override
@@ -153,23 +137,10 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S @@ -153,23 +137,10 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
153 137
154 @Override 138 @Override
155 public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) { 139 public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) {
156 - getCommunicationConfigForSpec(SnmpCommunicationSpec.TO_DEVICE_RPC_COMMAND_SETTING)  
157 - .ifPresent(communicationConfig -> {  
158 - String value = JsonConverter.toJson(toDeviceRequest, true).toString();  
159 - snmpTransportContext.getSnmpTransportService().sendRequest(  
160 - this, communicationConfig,  
161 - Map.of(ToDeviceRpcCommandSettingSnmpCommunicationConfig.RPC_COMMAND_KEY_NAME, value)  
162 - );  
163 - }); 140 + snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest);
164 } 141 }
165 142
166 @Override 143 @Override
167 public void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse) { 144 public void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse) {
168 } 145 }
169 -  
170 - private Optional<SnmpCommunicationConfig> getCommunicationConfigForSpec(SnmpCommunicationSpec spec) {  
171 - return profileTransportConfiguration.getCommunicationConfigs().stream()  
172 - .filter(config -> config.getSpec() == spec)  
173 - .findFirst();  
174 - }  
175 } 146 }
@@ -558,6 +558,14 @@ public class JsonConverter { @@ -558,6 +558,14 @@ public class JsonConverter {
558 } 558 }
559 } 559 }
560 560
  561 + public static JsonElement parse(String json) {
  562 + return JSON_PARSER.parse(json);
  563 + }
  564 +
  565 + public static String toJson(JsonElement element) {
  566 + return GSON.toJson(element);
  567 + }
  568 +
561 public static void setTypeCastEnabled(boolean enabled) { 569 public static void setTypeCastEnabled(boolean enabled) {
562 isTypeCastEnabled = enabled; 570 isTypeCastEnabled = enabled;
563 } 571 }
@@ -599,8 +607,7 @@ public class JsonConverter { @@ -599,8 +607,7 @@ public class JsonConverter {
599 .build(); 607 .build();
600 } 608 }
601 609
602 - private static TransportProtos.ProvisionDeviceCredentialsMsg buildProvisionDeviceCredentialsMsg(String  
603 - provisionKey, String provisionSecret) { 610 + private static TransportProtos.ProvisionDeviceCredentialsMsg buildProvisionDeviceCredentialsMsg(String provisionKey, String provisionSecret) {
604 return TransportProtos.ProvisionDeviceCredentialsMsg.newBuilder() 611 return TransportProtos.ProvisionDeviceCredentialsMsg.newBuilder()
605 .setProvisionDeviceKey(provisionKey) 612 .setProvisionDeviceKey(provisionKey)
606 .setProvisionDeviceSecret(provisionSecret) 613 .setProvisionDeviceSecret(provisionSecret)