123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- #!/usr/bin/env python3
- # SPDX-License-Identifier: GPL-2.0
- #
- # Program to allow users to fuzz test Hyper-V drivers
- # by interfacing with Hyper-V debugfs attributes.
- # Current test methods available:
- # 1. delay testing
- #
- # Current file/directory structure of hyper-V debugfs:
- # /sys/kernel/debug/hyperv/UUID
- # /sys/kernel/debug/hyperv/UUID/<test-state filename>
- # /sys/kernel/debug/hyperv/UUID/<test-method sub-directory>
- #
- # author: Branden Bonaby <[email protected]>
- import os
- import cmd
- import argparse
- import glob
- from argparse import RawDescriptionHelpFormatter
- from argparse import RawTextHelpFormatter
- from enum import Enum
- # Do not change unless, you change the debugfs attributes
- # in /drivers/hv/debugfs.c. All fuzz testing
- # attributes will start with "fuzz_test".
- # debugfs path for hyperv must exist before proceeding
- debugfs_hyperv_path = "/sys/kernel/debug/hyperv"
- if not os.path.isdir(debugfs_hyperv_path):
- print("{} doesn't exist/check permissions".format(debugfs_hyperv_path))
- exit(-1)
- class dev_state(Enum):
- off = 0
- on = 1
- # File names, that correspond to the files created in
- # /drivers/hv/debugfs.c
- class f_names(Enum):
- state_f = "fuzz_test_state"
- buff_f = "fuzz_test_buffer_interrupt_delay"
- mess_f = "fuzz_test_message_delay"
- # Both single_actions and all_actions are used
- # for error checking and to allow for some subparser
- # names to be abbreviated. Do not abbreviate the
- # test method names, as it will become less intuitive
- # as to what the user can do. If you do decide to
- # abbreviate the test method name, make sure the main
- # function reflects this change.
- all_actions = [
- "disable_all",
- "D",
- "enable_all",
- "view_all",
- "V"
- ]
- single_actions = [
- "disable_single",
- "d",
- "enable_single",
- "view_single",
- "v"
- ]
- def main():
- file_map = recursive_file_lookup(debugfs_hyperv_path, dict())
- args = parse_args()
- if (not args.action):
- print ("Error, no options selected...exiting")
- exit(-1)
- arg_set = { k for (k,v) in vars(args).items() if v and k != "action" }
- arg_set.add(args.action)
- path = args.path if "path" in arg_set else None
- if (path and path[-1] == "/"):
- path = path[:-1]
- validate_args_path(path, arg_set, file_map)
- if (path and "enable_single" in arg_set):
- state_path = locate_state(path, file_map)
- set_test_state(state_path, dev_state.on.value, args.quiet)
- # Use subparsers as the key for different actions
- if ("delay" in arg_set):
- validate_delay_values(args.delay_time)
- if (args.enable_all):
- set_delay_all_devices(file_map, args.delay_time,
- args.quiet)
- else:
- set_delay_values(path, file_map, args.delay_time,
- args.quiet)
- elif ("disable_all" in arg_set or "D" in arg_set):
- disable_all_testing(file_map)
- elif ("disable_single" in arg_set or "d" in arg_set):
- disable_testing_single_device(path, file_map)
- elif ("view_all" in arg_set or "V" in arg_set):
- get_all_devices_test_status(file_map)
- elif ("view_single" in arg_set or "v" in arg_set):
- get_device_test_values(path, file_map)
- # Get the state location
- def locate_state(device, file_map):
- return file_map[device][f_names.state_f.value]
- # Validate delay values to make sure they are acceptable to
- # enable delays on a device
- def validate_delay_values(delay):
- if (delay[0] == -1 and delay[1] == -1):
- print("\nError, At least 1 value must be greater than 0")
- exit(-1)
- for i in delay:
- if (i < -1 or i == 0 or i > 1000):
- print("\nError, Values must be equal to -1 "
- "or be > 0 and <= 1000")
- exit(-1)
- # Validate argument path
- def validate_args_path(path, arg_set, file_map):
- if (not path and any(element in arg_set for element in single_actions)):
- print("Error, path (-p) REQUIRED for the specified option. "
- "Use (-h) to check usage.")
- exit(-1)
- elif (path and any(item in arg_set for item in all_actions)):
- print("Error, path (-p) NOT REQUIRED for the specified option. "
- "Use (-h) to check usage." )
- exit(-1)
- elif (path not in file_map and any(item in arg_set
- for item in single_actions)):
- print("Error, path '{}' not a valid vmbus device".format(path))
- exit(-1)
- # display Testing status of single device
- def get_device_test_values(path, file_map):
- for name in file_map[path]:
- file_location = file_map[path][name]
- print( name + " = " + str(read_test_files(file_location)))
- # Create a map of the vmbus devices and their associated files
- # [key=device, value = [key = filename, value = file path]]
- def recursive_file_lookup(path, file_map):
- for f_path in glob.iglob(path + '**/*'):
- if (os.path.isfile(f_path)):
- if (f_path.rsplit("/",2)[0] == debugfs_hyperv_path):
- directory = f_path.rsplit("/",1)[0]
- else:
- directory = f_path.rsplit("/",2)[0]
- f_name = f_path.split("/")[-1]
- if (file_map.get(directory)):
- file_map[directory].update({f_name:f_path})
- else:
- file_map[directory] = {f_name:f_path}
- elif (os.path.isdir(f_path)):
- recursive_file_lookup(f_path,file_map)
- return file_map
- # display Testing state of devices
- def get_all_devices_test_status(file_map):
- for device in file_map:
- if (get_test_state(locate_state(device, file_map)) == 1):
- print("Testing = ON for: {}"
- .format(device.split("/")[5]))
- else:
- print("Testing = OFF for: {}"
- .format(device.split("/")[5]))
- # read the vmbus device files, path must be absolute path before calling
- def read_test_files(path):
- try:
- with open(path,"r") as f:
- file_value = f.readline().strip()
- return int(file_value)
- except IOError as e:
- errno, strerror = e.args
- print("I/O error({0}): {1} on file {2}"
- .format(errno, strerror, path))
- exit(-1)
- except ValueError:
- print ("Element to int conversion error in: \n{}".format(path))
- exit(-1)
- # writing to vmbus device files, path must be absolute path before calling
- def write_test_files(path, value):
- try:
- with open(path,"w") as f:
- f.write("{}".format(value))
- except IOError as e:
- errno, strerror = e.args
- print("I/O error({0}): {1} on file {2}"
- .format(errno, strerror, path))
- exit(-1)
- # set testing state of device
- def set_test_state(state_path, state_value, quiet):
- write_test_files(state_path, state_value)
- if (get_test_state(state_path) == 1):
- if (not quiet):
- print("Testing = ON for device: {}"
- .format(state_path.split("/")[5]))
- else:
- if (not quiet):
- print("Testing = OFF for device: {}"
- .format(state_path.split("/")[5]))
- # get testing state of device
- def get_test_state(state_path):
- #state == 1 - test = ON
- #state == 0 - test = OFF
- return read_test_files(state_path)
- # write 1 - 1000 microseconds, into a single device using the
- # fuzz_test_buffer_interrupt_delay and fuzz_test_message_delay
- # debugfs attributes
- def set_delay_values(device, file_map, delay_length, quiet):
- try:
- interrupt = file_map[device][f_names.buff_f.value]
- message = file_map[device][f_names.mess_f.value]
- # delay[0]- buffer interrupt delay, delay[1]- message delay
- if (delay_length[0] >= 0 and delay_length[0] <= 1000):
- write_test_files(interrupt, delay_length[0])
- if (delay_length[1] >= 0 and delay_length[1] <= 1000):
- write_test_files(message, delay_length[1])
- if (not quiet):
- print("Buffer delay testing = {} for: {}"
- .format(read_test_files(interrupt),
- interrupt.split("/")[5]))
- print("Message delay testing = {} for: {}"
- .format(read_test_files(message),
- message.split("/")[5]))
- except IOError as e:
- errno, strerror = e.args
- print("I/O error({0}): {1} on files {2}{3}"
- .format(errno, strerror, interrupt, message))
- exit(-1)
- # enabling delay testing on all devices
- def set_delay_all_devices(file_map, delay, quiet):
- for device in (file_map):
- set_test_state(locate_state(device, file_map),
- dev_state.on.value,
- quiet)
- set_delay_values(device, file_map, delay, quiet)
- # disable all testing on a SINGLE device.
- def disable_testing_single_device(device, file_map):
- for name in file_map[device]:
- file_location = file_map[device][name]
- write_test_files(file_location, dev_state.off.value)
- print("ALL testing now OFF for {}".format(device.split("/")[-1]))
- # disable all testing on ALL devices
- def disable_all_testing(file_map):
- for device in file_map:
- disable_testing_single_device(device, file_map)
- def parse_args():
- parser = argparse.ArgumentParser(prog = "vmbus_testing",usage ="\n"
- "%(prog)s [delay] [-h] [-e|-E] -t [-p]\n"
- "%(prog)s [view_all | V] [-h]\n"
- "%(prog)s [disable_all | D] [-h]\n"
- "%(prog)s [disable_single | d] [-h|-p]\n"
- "%(prog)s [view_single | v] [-h|-p]\n"
- "%(prog)s --version\n",
- description = "\nUse lsvmbus to get vmbus device type "
- "information.\n" "\nThe debugfs root path is "
- "/sys/kernel/debug/hyperv",
- formatter_class = RawDescriptionHelpFormatter)
- subparsers = parser.add_subparsers(dest = "action")
- parser.add_argument("--version", action = "version",
- version = '%(prog)s 0.1.0')
- parser.add_argument("-q","--quiet", action = "store_true",
- help = "silence none important test messages."
- " This will only work when enabling testing"
- " on a device.")
- # Use the path parser to hold the --path attribute so it can
- # be shared between subparsers. Also do the same for the state
- # parser, as all testing methods will use --enable_all and
- # enable_single.
- path_parser = argparse.ArgumentParser(add_help=False)
- path_parser.add_argument("-p","--path", metavar = "",
- help = "Debugfs path to a vmbus device. The path "
- "must be the absolute path to the device.")
- state_parser = argparse.ArgumentParser(add_help=False)
- state_group = state_parser.add_mutually_exclusive_group(required = True)
- state_group.add_argument("-E", "--enable_all", action = "store_const",
- const = "enable_all",
- help = "Enable the specified test type "
- "on ALL vmbus devices.")
- state_group.add_argument("-e", "--enable_single",
- action = "store_const",
- const = "enable_single",
- help = "Enable the specified test type on a "
- "SINGLE vmbus device.")
- parser_delay = subparsers.add_parser("delay",
- parents = [state_parser, path_parser],
- help = "Delay the ring buffer interrupt or the "
- "ring buffer message reads in microseconds.",
- prog = "vmbus_testing",
- usage = "%(prog)s [-h]\n"
- "%(prog)s -E -t [value] [value]\n"
- "%(prog)s -e -t [value] [value] -p",
- description = "Delay the ring buffer interrupt for "
- "vmbus devices, or delay the ring buffer message "
- "reads for vmbus devices (both in microseconds). This "
- "is only on the host to guest channel.")
- parser_delay.add_argument("-t", "--delay_time", metavar = "", nargs = 2,
- type = check_range, default =[0,0], required = (True),
- help = "Set [buffer] & [message] delay time. "
- "Value constraints: -1 == value "
- "or 0 < value <= 1000.\n"
- "Use -1 to keep the previous value for that delay "
- "type, or a value > 0 <= 1000 to change the delay "
- "time.")
- parser_dis_all = subparsers.add_parser("disable_all",
- aliases = ['D'], prog = "vmbus_testing",
- usage = "%(prog)s [disable_all | D] -h\n"
- "%(prog)s [disable_all | D]\n",
- help = "Disable ALL testing on ALL vmbus devices.",
- description = "Disable ALL testing on ALL vmbus "
- "devices.")
- parser_dis_single = subparsers.add_parser("disable_single",
- aliases = ['d'],
- parents = [path_parser], prog = "vmbus_testing",
- usage = "%(prog)s [disable_single | d] -h\n"
- "%(prog)s [disable_single | d] -p\n",
- help = "Disable ALL testing on a SINGLE vmbus device.",
- description = "Disable ALL testing on a SINGLE vmbus "
- "device.")
- parser_view_all = subparsers.add_parser("view_all", aliases = ['V'],
- help = "View the test state for ALL vmbus devices.",
- prog = "vmbus_testing",
- usage = "%(prog)s [view_all | V] -h\n"
- "%(prog)s [view_all | V]\n",
- description = "This shows the test state for ALL the "
- "vmbus devices.")
- parser_view_single = subparsers.add_parser("view_single",
- aliases = ['v'],parents = [path_parser],
- help = "View the test values for a SINGLE vmbus "
- "device.",
- description = "This shows the test values for a SINGLE "
- "vmbus device.", prog = "vmbus_testing",
- usage = "%(prog)s [view_single | v] -h\n"
- "%(prog)s [view_single | v] -p")
- return parser.parse_args()
- # value checking for range checking input in parser
- def check_range(arg1):
- try:
- val = int(arg1)
- except ValueError as err:
- raise argparse.ArgumentTypeError(str(err))
- if val < -1 or val > 1000:
- message = ("\n\nvalue must be -1 or 0 < value <= 1000. "
- "Value program received: {}\n").format(val)
- raise argparse.ArgumentTypeError(message)
- return val
- if __name__ == "__main__":
- main()
|