Skip to content
Open

tl #4

Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 93 additions & 9 deletions subset/network/network_tests.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import subprocess, time, sys, json

import re
import datetime

arguments = sys.argv

test_request = str(arguments[1])
Expand All @@ -24,13 +27,18 @@
description_communication_type = 'Device sends unicast or broadcast packets.'
description_ntp_support = 'Device sends NTP request packets.'

tcpdump_display_all_packets = 'tcpdump -n src host ' + device_address + ' -r ' + cap_pcap_file
tcpdump_display_all_packets = 'tcpdump -tttt -n src host ' + device_address + ' -r ' + cap_pcap_file
tcpdump_display_udp_bacnet_packets = 'tcpdump -n udp dst portrange 47808-47809 -r ' + cap_pcap_file
tcpdump_display_arp_packets = 'tcpdump arp -r ' + cap_pcap_file
tcpdump_display_arp_packets = 'tcpdump arp -n src host ' + device_address + ' -r ' + cap_pcap_file
tcpdump_display_ntp_packets = 'tcpdump dst port 123 -r ' + cap_pcap_file
tcpdump_display_eapol_packets = 'tcpdump port 1812 or port 1813 or port 3799 -r ' + cap_pcap_file
tcpdump_display_broadcast_packets = 'tcpdump broadcast and src host ' + device_address + ' -r ' + cap_pcap_file

system_conf_file = "/config/inst/system.conf"
tcpdump_date_format = "%Y-%m-%d %H:%M:%S.%f"
min_send_seconds = 300
min_send_duration = "5 minutes"

def write_report(string_to_append):
print(string_to_append.strip())
with open(report_filename, 'a+') as file_open:
Expand All @@ -49,7 +57,7 @@ def add_packet_count_to_report(packet_type, packet_count):
write_report("{i} {t} Packets recieved={p}\n".format(i=ignore, t=packet_type, p=packet_count))

def add_packet_info_to_report(packets_received):
packet_list = packets_received.rstrip().split("\n")
packet_list = packets_received.strip().split("\n")
outnum = min(len(packet_list), max_packets_in_report)
for x in range(0, outnum):
write_report("{i} {p}\n".format(i=ignore, p=packet_list[x]))
Expand Down Expand Up @@ -99,19 +107,95 @@ def decode_json_config(config_file, map_name, action):
elif action == 'remove':
remove_from_port_list(port_map)


def get_scan_length(config_file):
""" Gets length of the monitor.pcap scan

Reads the system.conf file to and returns the length of the monitor_scan

Args:
config_file: Location of system.conf file within test container

Returns:
Length of monitor scan in seconds

If not defined, or system.conf could not be found
returns false
"""

scan_length = False
try:
with open(config_file) as file:
for line in file:
match = re.search(r'^monitor_scan_sec=(\d+)', line)
if match:
matched_length = int(match.group(1))
# If scan length = 0 or not found, then monitor scan does not exist
scan_length = matched_length if matched_length > 0 else False
return scan_length
except Exception as e:
write_report("Error encountered reading system.conf {}".format(e))
return False

def test_connection_min_send():
""" Runs the connection.min_send test

Tests if the device sends data packets of any type (inc data, NTP, etc)
within a period of 5 minutes by looking through the monitor.pcap file

The length of test can be configured using the min_send_seconds variable
at the start of the file
"""

# Get scan length
scan_length = get_scan_length(system_conf_file)
min_send_delta = datetime.timedelta(seconds=min_send_seconds)
min_send_pass = False

# The test scans the monitor.pcap, so if it's not found skip
if not scan_length:
add_summary("DAQ monitor scan not running, test skipped")
return 'skip'

arp_shell_result = shell_command_with_result(tcpdump_display_arp_packets, 0, False)
arp_packets_received = packets_received_count(arp_shell_result)
if arp_packets_received > 0:
add_summary("ARP packets received.")

shell_result = shell_command_with_result(tcpdump_display_all_packets, 0, False)
all_packets_received = packets_received_count(shell_result)
app_packets_received = all_packets_received - arp_packets_received
if app_packets_received > 0:
add_summary("Other packets received.")
print('min_send_packets', arp_packets_received, all_packets_received)
all_packets = shell_result.splitlines()

# Loop through tcpdump result and measure the time between succesive packets
for i, packet in enumerate(all_packets):
# datetime is the first 26 characters of the line
packet_time = datetime.datetime.strptime(packet[:26], tcpdump_date_format)

if i == 0:
previous_packet_time = packet_time
continue

delta = packet_time - previous_packet_time
if delta < min_send_delta:
min_send_pass = True
break

previous_packet_time = packet_time

add_packet_info_to_report(shell_result)
return 'pass' if app_packets_received > 0 else 'fail'

if not min_send_pass:
if scan_length > min_send_seconds:
add_summary('Data packets were not sent at a frequency less than ' +
min_send_duration)
return 'fail'
else:
add_summary('Please set DAQ monitor scan to be greater than ' +
min_send_duration)
return 'skip'

add_summary('Data packets were sent at a frequency of less than ' +
min_send_duration)
return 'pass'

def test_connection_dhcp_long():
shell_result = shell_command_with_result(tcpdump_display_arp_packets, 0, False)
Expand Down