Commit 415bf570bac6b422f880622163924464d6ca390b

Authored by Viacheslav Klimov
1 parent e52ac96c

Refactor

... ... @@ -17,16 +17,21 @@ package org.thingsboard.server.common.data.device.data;
17 17
18 18 import com.fasterxml.jackson.annotation.JsonIgnore;
19 19 import lombok.Data;
  20 +import lombok.ToString;
  21 +import org.apache.commons.lang3.ObjectUtils;
20 22 import org.apache.commons.lang3.StringUtils;
21 23 import org.thingsboard.server.common.data.DeviceTransportType;
22 24 import org.thingsboard.server.common.data.transport.snmp.AuthenticationProtocol;
23 25 import org.thingsboard.server.common.data.transport.snmp.PrivacyProtocol;
24 26 import org.thingsboard.server.common.data.transport.snmp.SnmpProtocolVersion;
25 27
  28 +import java.util.Objects;
  29 +
26 30 @Data
  31 +@ToString(of = {"host", "port", "protocolVersion"})
27 32 public class SnmpDeviceTransportConfiguration implements DeviceTransportConfiguration {
28   - private String address;
29   - private int port;
  33 + private String host;
  34 + private Integer port;
30 35 private SnmpProtocolVersion protocolVersion;
31 36
32 37 /*
... ... @@ -60,6 +65,21 @@ public class SnmpDeviceTransportConfiguration implements DeviceTransportConfigur
60 65
61 66 @JsonIgnore
62 67 private boolean isValid() {
63   - return true;
  68 + boolean isValid = StringUtils.isNotBlank(host) && port != null && protocolVersion != null;
  69 + if (isValid) {
  70 + switch (protocolVersion) {
  71 + case V1:
  72 + case V2C:
  73 + isValid = StringUtils.isNotEmpty(community);
  74 + break;
  75 + case V3:
  76 + isValid = StringUtils.isNotBlank(username) && StringUtils.isNotBlank(securityName)
  77 + && contextName != null && authenticationProtocol != null
  78 + && StringUtils.isNotBlank(authenticationPassphrase)
  79 + && privacyProtocol != null && privacyPassphrase != null && engineId != null;
  80 + break;
  81 + }
  82 + }
  83 + return isValid;
64 84 }
65 85 }
... ...
... ... @@ -17,15 +17,21 @@ package org.thingsboard.server.common.data.device.profile;
17 17
18 18 import com.fasterxml.jackson.annotation.JsonIgnore;
19 19 import lombok.Data;
  20 +import org.apache.commons.lang3.ArrayUtils;
20 21 import org.thingsboard.server.common.data.DeviceTransportType;
  22 +import org.thingsboard.server.common.data.transport.snmp.SnmpMapping;
21 23 import org.thingsboard.server.common.data.transport.snmp.configs.SnmpCommunicationConfig;
22 24
  25 +import java.util.Collections;
23 26 import java.util.List;
  27 +import java.util.function.Function;
  28 +import java.util.stream.Collectors;
  29 +import java.util.stream.Stream;
24 30
25 31 @Data
26 32 public class SnmpDeviceProfileTransportConfiguration implements DeviceProfileTransportConfiguration {
27   - private int timeoutMs;
28   - private int retries;
  33 + private Integer timeoutMs;
  34 + private Integer retries;
29 35 private List<SnmpCommunicationConfig> communicationConfigs;
30 36
31 37 @Override
... ... @@ -36,12 +42,16 @@ public class SnmpDeviceProfileTransportConfiguration implements DeviceProfileTra
36 42 @Override
37 43 public void validate() {
38 44 if (!isValid()) {
39   - throw new IllegalArgumentException("Transport configuration is not valid");
  45 + throw new IllegalArgumentException("SNMP transport configuration is not valid");
40 46 }
41 47 }
42 48
43 49 @JsonIgnore
44 50 private boolean isValid() {
45   - return true;
  51 + return timeoutMs != null && timeoutMs >= 0 && retries != null && retries >= 0
  52 + && communicationConfigs != null && !communicationConfigs.isEmpty()
  53 + && communicationConfigs.stream().allMatch(config -> config != null && config.isValid())
  54 + && communicationConfigs.stream().flatMap(config -> config.getMappings().stream()).map(SnmpMapping::getOid)
  55 + .distinct().count() == communicationConfigs.stream().mapToInt(config -> config.getMappings().size()).sum();
46 56 }
47 57 }
... ...
... ... @@ -31,6 +31,6 @@ public abstract class RepeatingQueryingSnmpCommunicationConfig extends SnmpCommu
31 31
32 32 @Override
33 33 public boolean isValid() {
34   - return true;
  34 + return super.isValid() && queryingFrequencyMs != null && queryingFrequencyMs > 0;
35 35 }
36 36 }
... ...
... ... @@ -28,9 +28,4 @@ public class SharedAttributesSettingSnmpCommunicationConfig extends SnmpCommunic
28 28 public SnmpMethod getMethod() {
29 29 return SnmpMethod.SET;
30 30 }
31   -
32   - @Override
33   - public boolean isValid() {
34   - return true;
35   - }
36 31 }
... ...
... ... @@ -49,12 +49,6 @@ public abstract class SnmpCommunicationConfig {
49 49
50 50 @JsonIgnore
51 51 public boolean isValid() {
52   - return true;
53   - }
54   -
55   - public void validate() {
56   - if (!isValid()) {
57   - throw new IllegalArgumentException("Communication config is not valid");
58   - }
  52 + return mappings != null && !mappings.isEmpty() && mappings.stream().allMatch(mapping -> mapping != null && mapping.isValid());
59 53 }
60 54 }
... ...
... ... @@ -24,7 +24,6 @@ import org.snmp4j.security.SecurityLevel;
24 24 import org.snmp4j.security.SecurityModel;
25 25 import org.snmp4j.security.SecurityProtocols;
26 26 import org.snmp4j.security.USM;
27   -import org.snmp4j.security.UsmUser;
28 27 import org.snmp4j.smi.GenericAddress;
29 28 import org.snmp4j.smi.OID;
30 29 import org.snmp4j.smi.OctetString;
... ... @@ -98,7 +97,7 @@ public class SnmpAuthService {
98 97 throw new UnsupportedOperationException("SNMP protocol version " + protocolVersion + " is not supported");
99 98 }
100 99
101   - target.setAddress(GenericAddress.parse(snmpUnderlyingProtocol + ":" + deviceTransportConfig.getAddress() + "/" + deviceTransportConfig.getPort()));
  100 + target.setAddress(GenericAddress.parse(snmpUnderlyingProtocol + ":" + deviceTransportConfig.getHost() + "/" + deviceTransportConfig.getPort()));
102 101 target.setTimeout(profileTransportConfig.getTimeoutMs());
103 102 target.setRetries(profileTransportConfig.getRetries());
104 103 target.setVersion(protocolVersion.getCode());
... ...
... ... @@ -89,18 +89,12 @@ public class SnmpTransportContext extends TransportContext {
89 89 managedDevicesIds.stream()
90 90 .map(protoEntityService::getDeviceById)
91 91 .collect(Collectors.toList())
92   - .forEach(device -> {
93   - try {
94   - establishDeviceSession(device);
95   - } catch (Exception e) {
96   - log.error("Failed to establish session for SNMP device {}: {}", device.getId(), e.getMessage());
97   - }
98   - });
  92 + .forEach(this::establishDeviceSession);
99 93 }
100 94
101 95 private void establishDeviceSession(Device device) {
102 96 if (device == null) return;
103   - log.info("Establishing SNMP device session for device {}", device.getId());
  97 + log.info("Establishing SNMP session for device {}", device.getId());
104 98
105 99 DeviceProfileId deviceProfileId = device.getDeviceProfileId();
106 100 DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId);
... ... @@ -114,18 +108,24 @@ public class SnmpTransportContext extends TransportContext {
114 108 SnmpDeviceProfileTransportConfiguration profileTransportConfiguration = (SnmpDeviceProfileTransportConfiguration) deviceProfile.getProfileData().getTransportConfiguration();
115 109 SnmpDeviceTransportConfiguration deviceTransportConfiguration = (SnmpDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration();
116 110
117   - DeviceSessionContext deviceSessionContext = new DeviceSessionContext(
118   - device, deviceProfile, credentials.getCredentialsId(),
119   - profileTransportConfiguration, deviceTransportConfiguration, this
120   - );
121   - registerSessionMsgListener(deviceSessionContext);
  111 + DeviceSessionContext deviceSessionContext;
  112 + try {
  113 + deviceSessionContext = new DeviceSessionContext(
  114 + device, deviceProfile, credentials.getCredentialsId(),
  115 + profileTransportConfiguration, deviceTransportConfiguration, this
  116 + );
  117 + registerSessionMsgListener(deviceSessionContext);
  118 + } catch (Exception e) {
  119 + log.error("Failed to establish session for SNMP device {}: {}", device.getId(), e.getMessage());
  120 + return;
  121 + }
122 122 sessions.put(device.getId(), deviceSessionContext);
123 123 snmpTransportService.createQueryingTasks(deviceSessionContext);
124 124 log.info("Established SNMP device session for device {}", device.getId());
125 125 }
126 126
127 127 private void updateDeviceSession(DeviceSessionContext sessionContext, Device device, DeviceProfile deviceProfile) {
128   - log.info("Updating SNMP device session for device {}", device.getId());
  128 + log.info("Updating SNMP session for device {}", device.getId());
129 129
130 130 DeviceCredentials credentials = protoEntityService.getDeviceCredentialsByDeviceId(device.getId());
131 131 if (credentials.getCredentialsType() != DeviceCredentialsType.ACCESS_TOKEN) {
... ... @@ -137,16 +137,20 @@ public class SnmpTransportContext extends TransportContext {
137 137 SnmpDeviceProfileTransportConfiguration newProfileTransportConfiguration = (SnmpDeviceProfileTransportConfiguration) deviceProfile.getProfileData().getTransportConfiguration();
138 138 SnmpDeviceTransportConfiguration newDeviceTransportConfiguration = (SnmpDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration();
139 139
140   - if (!newProfileTransportConfiguration.equals(sessionContext.getProfileTransportConfiguration())) {
141   - sessionContext.setProfileTransportConfiguration(newProfileTransportConfiguration);
142   - sessionContext.initializeTarget(newProfileTransportConfiguration, newDeviceTransportConfiguration);
143   - snmpTransportService.cancelQueryingTasks(sessionContext);
144   - snmpTransportService.createQueryingTasks(sessionContext);
145   - } else if (!newDeviceTransportConfiguration.equals(sessionContext.getDeviceTransportConfiguration())) {
146   - sessionContext.setDeviceTransportConfiguration(newDeviceTransportConfiguration);
147   - sessionContext.initializeTarget(newProfileTransportConfiguration, newDeviceTransportConfiguration);
148   - } else {
149   - log.trace("Configuration of the device {} was not updated", device);
  140 + try {
  141 + if (!newProfileTransportConfiguration.equals(sessionContext.getProfileTransportConfiguration())) {
  142 + sessionContext.setProfileTransportConfiguration(newProfileTransportConfiguration);
  143 + sessionContext.initializeTarget(newProfileTransportConfiguration, newDeviceTransportConfiguration);
  144 + snmpTransportService.cancelQueryingTasks(sessionContext);
  145 + snmpTransportService.createQueryingTasks(sessionContext);
  146 + } else if (!newDeviceTransportConfiguration.equals(sessionContext.getDeviceTransportConfiguration())) {
  147 + sessionContext.setDeviceTransportConfiguration(newDeviceTransportConfiguration);
  148 + sessionContext.initializeTarget(newProfileTransportConfiguration, newDeviceTransportConfiguration);
  149 + } else {
  150 + log.trace("Configuration of the device {} was not updated", device);
  151 + }
  152 + } catch (Exception e) {
  153 + log.error("Failed to update session for SNMP device {}: {}", sessionContext.getDeviceId(), e.getMessage());
150 154 }
151 155 }
152 156
... ... @@ -156,8 +160,8 @@ public class SnmpTransportContext extends TransportContext {
156 160 sessionContext.close();
157 161 snmpAuthService.cleanUpSnmpAuthInfo(sessionContext);
158 162 transportService.deregisterSession(sessionContext.getSessionInfo());
159   - sessions.remove(sessionContext.getDeviceId());
160 163 snmpTransportService.cancelQueryingTasks(sessionContext);
  164 + sessions.remove(sessionContext.getDeviceId());
161 165 log.trace("Unregistered and removed session");
162 166 }
163 167
... ...
... ... @@ -28,9 +28,11 @@ import org.snmp4j.mp.MPv3;
28 28 import org.snmp4j.security.SecurityModels;
29 29 import org.snmp4j.security.SecurityProtocols;
30 30 import org.snmp4j.security.USM;
  31 +import org.snmp4j.smi.Integer32;
31 32 import org.snmp4j.smi.Null;
32 33 import org.snmp4j.smi.OID;
33 34 import org.snmp4j.smi.OctetString;
  35 +import org.snmp4j.smi.Variable;
34 36 import org.snmp4j.smi.VariableBinding;
35 37 import org.snmp4j.transport.DefaultTcpTransportMapping;
36 38 import org.snmp4j.transport.DefaultUdpTransportMapping;
... ... @@ -143,7 +145,7 @@ public class SnmpTransportService implements TbTransportService {
143 145 sendRequest(sessionContext, communicationConfig);
144 146 }
145 147 } catch (Exception e) {
146   - log.error("Failed to send SNMP request for device {}: {}", sessionContext.getDeviceId(), e.getMessage());
  148 + log.error("Failed to send SNMP request for device {}: {}", sessionContext.getDeviceId(), e.toString());
147 149 }
148 150 }, queryingFrequency, queryingFrequency, TimeUnit.MILLISECONDS);
149 151 }
... ... @@ -192,7 +194,24 @@ public class SnmpTransportService implements TbTransportService {
192 194 pdu.addAll(communicationConfig.getMappings().stream()
193 195 .filter(mapping -> values.isEmpty() || values.containsKey(mapping.getKey()))
194 196 .map(mapping -> Optional.ofNullable(values.get(mapping.getKey()))
195   - .map(value -> new VariableBinding(new OID(mapping.getOid()), new OctetString(values.get(mapping.getKey()))))
  197 + .map(value -> {
  198 + Variable variable;
  199 + switch (mapping.getDataType()) {
  200 + case LONG:
  201 + try {
  202 + variable = new Integer32(Integer.parseInt(value));
  203 + break;
  204 + } catch (NumberFormatException ignored) {
  205 + }
  206 + case DOUBLE:
  207 + case BOOLEAN:
  208 + case STRING:
  209 + case JSON:
  210 + default:
  211 + variable = new OctetString(value);
  212 + }
  213 + return new VariableBinding(new OID(mapping.getOid()), variable);
  214 + })
196 215 .orElseGet(() -> new VariableBinding(new OID(mapping.getOid()))))
197 216 .collect(Collectors.toList()));
198 217
... ... @@ -267,7 +286,9 @@ public class SnmpTransportService implements TbTransportService {
267 286 responses.forEach((spec, response) -> {
268 287 Optional.ofNullable(responseProcessors.get(spec))
269 288 .ifPresent(responseProcessor -> {
270   - responseProcessor.accept(response, sessionContext);
  289 + if (!response.entrySet().isEmpty()) {
  290 + responseProcessor.accept(response, sessionContext);
  291 + }
271 292 });
272 293 });
273 294
... ...
... ... @@ -63,8 +63,6 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
63 63 private final Device device;
64 64
65 65 private final SnmpTransportContext snmpTransportContext;
66   - private final SnmpTransportService snmpTransportService;
67   - private final SnmpAuthService snmpAuthService;
68 66
69 67 @Getter
70 68 @Setter
... ... @@ -80,7 +78,7 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
80 78 public DeviceSessionContext(Device device, DeviceProfile deviceProfile, String token,
81 79 SnmpDeviceProfileTransportConfiguration profileTransportConfiguration,
82 80 SnmpDeviceTransportConfiguration deviceTransportConfiguration,
83   - SnmpTransportContext snmpTransportContext) {
  81 + SnmpTransportContext snmpTransportContext) throws Exception {
84 82 super(UUID.randomUUID());
85 83 super.setDeviceId(device.getId());
86 84 super.setDeviceProfile(deviceProfile);
... ... @@ -88,8 +86,6 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
88 86
89 87 this.token = token;
90 88 this.snmpTransportContext = snmpTransportContext;
91   - this.snmpTransportService = snmpTransportContext.getSnmpTransportService();
92   - this.snmpAuthService = snmpTransportContext.getSnmpAuthService();
93 89
94 90 this.profileTransportConfiguration = profileTransportConfiguration;
95 91 this.deviceTransportConfiguration = deviceTransportConfiguration;
... ... @@ -113,13 +109,13 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
113 109 @Override
114 110 public void onResponse(ResponseEvent event) {
115 111 if (isActive) {
116   - snmpTransportService.processResponseEvent(this, event);
  112 + snmpTransportContext.getSnmpTransportService().processResponseEvent(this, event);
117 113 }
118 114 }
119 115
120   - public void initializeTarget(SnmpDeviceProfileTransportConfiguration profileTransportConfig, SnmpDeviceTransportConfiguration deviceTransportConfig) {
  116 + public void initializeTarget(SnmpDeviceProfileTransportConfiguration profileTransportConfig, SnmpDeviceTransportConfiguration deviceTransportConfig) throws Exception {
121 117 log.trace("Initializing target for SNMP session of device {}", device);
122   - this.target = snmpAuthService.setUpSnmpTarget(profileTransportConfig, deviceTransportConfig);
  118 + this.target = snmpTransportContext.getSnmpAuthService().setUpSnmpTarget(profileTransportConfig, deviceTransportConfig);
123 119 log.info("SNMP target initialized: {}", target);
124 120 }
125 121
... ... @@ -152,7 +148,7 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
152 148 entry -> entry.getValue().isJsonPrimitive() ? entry.getValue().getAsString() : entry.getValue().toString()
153 149 ));
154 150 try {
155   - snmpTransportService.sendRequest(this, communicationConfig, sharedAttributes);
  151 + snmpTransportContext.getSnmpTransportService().sendRequest(this, communicationConfig, sharedAttributes);
156 152 } catch (Exception e) {
157 153 log.error("Failed to send request with shared attributes to SNMP device {}: {}", getDeviceId(), e.getMessage());
158 154 }
... ...