mirror of https://github.com/nirenjan/libx52.git
204 lines
6.7 KiB
Python
Executable File
204 lines
6.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Test communication with x52d and verify that the behavior is as expected"""
|
|
# pylint: disable=consider-using-f-string
|
|
|
|
import glob
|
|
import os
|
|
import os.path
|
|
import platform
|
|
import shlex
|
|
import signal
|
|
import subprocess
|
|
import tempfile
|
|
import time
|
|
import sys
|
|
|
|
class TestCase:
|
|
"""TestCase class handles an individual test case"""
|
|
def __init__(self, data):
|
|
"""Create a new test case"""
|
|
self.desc = None
|
|
self.in_cmd = None
|
|
self.exp_resp = None
|
|
self.parse(data)
|
|
|
|
def parse(self, data):
|
|
"""Parses a string of the following form:
|
|
<description>
|
|
space separated input line, possibly quoted
|
|
space separated expected response, possibly quoted
|
|
"""
|
|
self.desc, in_cmd, exp_resp = data.splitlines()
|
|
self.in_cmd = shlex.split(in_cmd)
|
|
self.exp_resp = '\0'.join(shlex.split(exp_resp)).encode() + b'\0'
|
|
|
|
def execute(self, index, suite):
|
|
"""Execute the test case and return the result in TAP format"""
|
|
def dump_failed(name, value):
|
|
"""Dump the failed test case"""
|
|
print("# {}".format(name))
|
|
for argv in value.decode().split('\0'):
|
|
print("#\t {}".format(argv))
|
|
print()
|
|
|
|
def print_result(passed):
|
|
"""Print the test case result and description"""
|
|
out = "ok {} - {}".format(index+1, self.desc)
|
|
if not passed:
|
|
out = "not " + out
|
|
print(out)
|
|
|
|
cmd = [suite.find_control_program(),
|
|
'-s', suite.command, '--',
|
|
*self.in_cmd]
|
|
|
|
testcase = subprocess.run(cmd, stdout=subprocess.PIPE, check=False)
|
|
if testcase.returncode != 0:
|
|
print_result(False)
|
|
print("# x52ctl returned code: {}".format(testcase.returncode))
|
|
dump_failed("Expected", self.exp_resp)
|
|
dump_failed("Got", testcase.stdout)
|
|
elif testcase.stdout != self.exp_resp:
|
|
print_result(False)
|
|
dump_failed("Expected", self.exp_resp)
|
|
dump_failed("Got", testcase.stdout)
|
|
else:
|
|
print_result(True)
|
|
|
|
|
|
class Test:
|
|
"""Test class runs a series of unit tests"""
|
|
|
|
def __init__(self):
|
|
"""Create a new instance of the Test class"""
|
|
self.program = self.find_daemon_program()
|
|
self.tmpdir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with
|
|
self.command = os.path.join(self.tmpdir.name, "x52d.cmd")
|
|
self.notify = os.path.join(self.tmpdir.name, "x52d.notify")
|
|
self.daemon = None
|
|
self.testcases = []
|
|
|
|
def __enter__(self):
|
|
"""Context manager entry"""
|
|
self.launch_daemon()
|
|
return self
|
|
|
|
def __exit__(self, *exc):
|
|
"""Context manager exit"""
|
|
self.terminate_daemon()
|
|
self.tmpdir.cleanup()
|
|
|
|
@staticmethod
|
|
def find_daemon_program():
|
|
"""Find the daemon program. This script should be run from the
|
|
root of the build directory"""
|
|
daemon_candidates = glob.glob('**/x52d', recursive=True)
|
|
if not daemon_candidates:
|
|
print("Bail out! Unable to find X52 daemon.")
|
|
sys.exit(1)
|
|
|
|
return os.path.realpath(daemon_candidates[0])
|
|
|
|
@staticmethod
|
|
def find_control_program():
|
|
"""Find the control program. This script should be run from the
|
|
root of the build directory"""
|
|
ctl_candidates = glob.glob('**/x52ctl', recursive=True)
|
|
if not ctl_candidates:
|
|
print("Bail out! Unable to find x52ctl.")
|
|
sys.exit(1)
|
|
|
|
return os.path.realpath(ctl_candidates[0])
|
|
|
|
def launch_daemon(self):
|
|
"""Launch an instance of the running daemon"""
|
|
if self.daemon is not None:
|
|
# We've already started the daemon, check if it is still running
|
|
if self.daemon.poll() is None:
|
|
return
|
|
|
|
self.daemon = None
|
|
|
|
daemon_cmdline = [
|
|
self.program,
|
|
"-f", # Run in foreground
|
|
"-q", # Quiet logging
|
|
"-c", os.path.join(self.tmpdir.name, "x52d.cfg"), # Default config file
|
|
"-l", os.path.join(self.tmpdir.name, "x52d.log"), # Output logs to log file
|
|
"-p", os.path.join(self.tmpdir.name, "x52d.pid"), # PID file
|
|
"-s", self.command, # Command socket path
|
|
"-b", self.notify, # Notification socket path
|
|
]
|
|
|
|
# Create empty config file
|
|
with open(daemon_cmdline[4], 'w', encoding='utf-8'):
|
|
pass
|
|
|
|
self.daemon = subprocess.Popen(daemon_cmdline) # pylint: disable=consider-using-with
|
|
|
|
print("# Sleeping 2 seconds for daemon to start")
|
|
time.sleep(2)
|
|
|
|
def terminate_daemon(self):
|
|
"""Terminate a running daemon"""
|
|
if self.daemon is None:
|
|
return
|
|
|
|
# Send a SIGTERM to the daemon
|
|
os.kill(self.daemon.pid, signal.SIGTERM)
|
|
try:
|
|
self.daemon.wait(timeout=15)
|
|
except subprocess.TimeoutExpired:
|
|
# Forcibly kill the running process
|
|
self.daemon.kill()
|
|
finally:
|
|
self.daemon = None
|
|
|
|
def append(self, testcase):
|
|
"""Add one testcase to the test case list"""
|
|
self.testcases.append(testcase)
|
|
|
|
def extend(self, testcases):
|
|
"""Add one or more testcases to the test case list"""
|
|
self.testcases.extend(testcases)
|
|
|
|
@staticmethod
|
|
def dump_failed(name, value):
|
|
"""Dump the failed test case"""
|
|
print("# {}".format(name))
|
|
for argv in value.decode().split('\0'):
|
|
print("#\t {}".format(argv))
|
|
print()
|
|
|
|
def run_tests(self):
|
|
"""Run test cases"""
|
|
print("1..{}".format(len(self.testcases)))
|
|
for index, testcase in enumerate(self.testcases):
|
|
testcase.execute(index, self)
|
|
|
|
def find_and_parse_testcase_files(self):
|
|
"""Find and parse *.tc files"""
|
|
basedir = os.path.dirname(os.path.realpath(__file__))
|
|
pattern = os.path.join(basedir, 'tests', '**', '*.tc')
|
|
tc_files = sorted(glob.glob(pattern, recursive=True))
|
|
|
|
for tc_file in tc_files:
|
|
with open(tc_file, encoding='utf-8') as tc_fd:
|
|
# Test cases are separated by blank lines
|
|
testcases = tc_fd.read().split('\n\n')
|
|
self.extend(TestCase(tc_data) for tc_data in testcases)
|
|
|
|
def main():
|
|
"""Main routine adds test cases to the Test class and runs them"""
|
|
# Only run the tests on Linux platform
|
|
if platform.system() != 'Linux':
|
|
print('1..0 # Skipping tests on', platform.system())
|
|
return
|
|
|
|
with Test() as test:
|
|
test.find_and_parse_testcase_files()
|
|
test.run_tests()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|