test_bpftool.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. # SPDX-License-Identifier: GPL-2.0
  2. # Copyright (c) 2020 SUSE LLC.
  3. import collections
  4. import functools
  5. import json
  6. import os
  7. import socket
  8. import subprocess
  9. import unittest
  10. # Add the source tree of bpftool and /usr/local/sbin to PATH
  11. cur_dir = os.path.dirname(os.path.realpath(__file__))
  12. bpftool_dir = os.path.abspath(os.path.join(cur_dir, "..", "..", "..", "..",
  13. "tools", "bpf", "bpftool"))
  14. os.environ["PATH"] = bpftool_dir + ":/usr/local/sbin:" + os.environ["PATH"]
  15. class IfaceNotFoundError(Exception):
  16. pass
  17. class UnprivilegedUserError(Exception):
  18. pass
  19. def _bpftool(args, json=True):
  20. _args = ["bpftool"]
  21. if json:
  22. _args.append("-j")
  23. _args.extend(args)
  24. return subprocess.check_output(_args)
  25. def bpftool(args):
  26. return _bpftool(args, json=False).decode("utf-8")
  27. def bpftool_json(args):
  28. res = _bpftool(args)
  29. return json.loads(res)
  30. def get_default_iface():
  31. for iface in socket.if_nameindex():
  32. if iface[1] != "lo":
  33. return iface[1]
  34. raise IfaceNotFoundError("Could not find any network interface to probe")
  35. def default_iface(f):
  36. @functools.wraps(f)
  37. def wrapper(*args, **kwargs):
  38. iface = get_default_iface()
  39. return f(*args, iface, **kwargs)
  40. return wrapper
  41. DMESG_EMITTING_HELPERS = [
  42. "bpf_probe_write_user",
  43. "bpf_trace_printk",
  44. "bpf_trace_vprintk",
  45. ]
  46. class TestBpftool(unittest.TestCase):
  47. @classmethod
  48. def setUpClass(cls):
  49. if os.getuid() != 0:
  50. raise UnprivilegedUserError(
  51. "This test suite needs root privileges")
  52. @default_iface
  53. def test_feature_dev_json(self, iface):
  54. unexpected_helpers = DMESG_EMITTING_HELPERS
  55. expected_keys = [
  56. "syscall_config",
  57. "program_types",
  58. "map_types",
  59. "helpers",
  60. "misc",
  61. ]
  62. res = bpftool_json(["feature", "probe", "dev", iface])
  63. # Check if the result has all expected keys.
  64. self.assertCountEqual(res.keys(), expected_keys)
  65. # Check if unexpected helpers are not included in helpers probes
  66. # result.
  67. for helpers in res["helpers"].values():
  68. for unexpected_helper in unexpected_helpers:
  69. self.assertNotIn(unexpected_helper, helpers)
  70. def test_feature_kernel(self):
  71. test_cases = [
  72. bpftool_json(["feature", "probe", "kernel"]),
  73. bpftool_json(["feature", "probe"]),
  74. bpftool_json(["feature"]),
  75. ]
  76. unexpected_helpers = DMESG_EMITTING_HELPERS
  77. expected_keys = [
  78. "syscall_config",
  79. "system_config",
  80. "program_types",
  81. "map_types",
  82. "helpers",
  83. "misc",
  84. ]
  85. for tc in test_cases:
  86. # Check if the result has all expected keys.
  87. self.assertCountEqual(tc.keys(), expected_keys)
  88. # Check if unexpected helpers are not included in helpers probes
  89. # result.
  90. for helpers in tc["helpers"].values():
  91. for unexpected_helper in unexpected_helpers:
  92. self.assertNotIn(unexpected_helper, helpers)
  93. def test_feature_kernel_full(self):
  94. test_cases = [
  95. bpftool_json(["feature", "probe", "kernel", "full"]),
  96. bpftool_json(["feature", "probe", "full"]),
  97. ]
  98. expected_helpers = DMESG_EMITTING_HELPERS
  99. for tc in test_cases:
  100. # Check if expected helpers are included at least once in any
  101. # helpers list for any program type. Unfortunately we cannot assume
  102. # that they will be included in all program types or a specific
  103. # subset of programs. It depends on the kernel version and
  104. # configuration.
  105. found_helpers = False
  106. for helpers in tc["helpers"].values():
  107. if all(expected_helper in helpers
  108. for expected_helper in expected_helpers):
  109. found_helpers = True
  110. break
  111. self.assertTrue(found_helpers)
  112. def test_feature_kernel_full_vs_not_full(self):
  113. full_res = bpftool_json(["feature", "probe", "full"])
  114. not_full_res = bpftool_json(["feature", "probe"])
  115. not_full_set = set()
  116. full_set = set()
  117. for helpers in full_res["helpers"].values():
  118. for helper in helpers:
  119. full_set.add(helper)
  120. for helpers in not_full_res["helpers"].values():
  121. for helper in helpers:
  122. not_full_set.add(helper)
  123. self.assertCountEqual(full_set - not_full_set,
  124. set(DMESG_EMITTING_HELPERS))
  125. self.assertCountEqual(not_full_set - full_set, set())
  126. def test_feature_macros(self):
  127. expected_patterns = [
  128. r"/\*\*\* System call availability \*\*\*/",
  129. r"#define HAVE_BPF_SYSCALL",
  130. r"/\*\*\* eBPF program types \*\*\*/",
  131. r"#define HAVE.*PROG_TYPE",
  132. r"/\*\*\* eBPF map types \*\*\*/",
  133. r"#define HAVE.*MAP_TYPE",
  134. r"/\*\*\* eBPF helper functions \*\*\*/",
  135. r"#define HAVE.*HELPER",
  136. r"/\*\*\* eBPF misc features \*\*\*/",
  137. ]
  138. res = bpftool(["feature", "probe", "macros"])
  139. for pattern in expected_patterns:
  140. self.assertRegex(res, pattern)