You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
178 lines
5.4 KiB
178 lines
5.4 KiB
2 years ago
|
"""Tests for sys.audit and sys.addaudithook
|
||
|
"""
|
||
|
|
||
|
import subprocess
|
||
|
import sys
|
||
|
import unittest
|
||
|
from test import support
|
||
|
from test.support import import_helper
|
||
|
from test.support import os_helper
|
||
|
|
||
|
|
||
|
if not hasattr(sys, "addaudithook") or not hasattr(sys, "audit"):
|
||
|
raise unittest.SkipTest("test only relevant when sys.audit is available")
|
||
|
|
||
|
AUDIT_TESTS_PY = support.findfile("audit-tests.py")
|
||
|
|
||
|
|
||
|
class AuditTest(unittest.TestCase):
|
||
|
def do_test(self, *args):
|
||
|
with subprocess.Popen(
|
||
|
[sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
|
||
|
encoding="utf-8",
|
||
|
stdout=subprocess.PIPE,
|
||
|
stderr=subprocess.PIPE,
|
||
|
) as p:
|
||
|
p.wait()
|
||
|
sys.stdout.writelines(p.stdout)
|
||
|
sys.stderr.writelines(p.stderr)
|
||
|
if p.returncode:
|
||
|
self.fail("".join(p.stderr))
|
||
|
|
||
|
def run_python(self, *args):
|
||
|
events = []
|
||
|
with subprocess.Popen(
|
||
|
[sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
|
||
|
encoding="utf-8",
|
||
|
stdout=subprocess.PIPE,
|
||
|
stderr=subprocess.PIPE,
|
||
|
) as p:
|
||
|
p.wait()
|
||
|
sys.stderr.writelines(p.stderr)
|
||
|
return (
|
||
|
p.returncode,
|
||
|
[line.strip().partition(" ") for line in p.stdout],
|
||
|
"".join(p.stderr),
|
||
|
)
|
||
|
|
||
|
def test_basic(self):
|
||
|
self.do_test("test_basic")
|
||
|
|
||
|
def test_block_add_hook(self):
|
||
|
self.do_test("test_block_add_hook")
|
||
|
|
||
|
def test_block_add_hook_baseexception(self):
|
||
|
self.do_test("test_block_add_hook_baseexception")
|
||
|
|
||
|
def test_marshal(self):
|
||
|
import_helper.import_module("marshal")
|
||
|
|
||
|
self.do_test("test_marshal")
|
||
|
|
||
|
def test_pickle(self):
|
||
|
import_helper.import_module("pickle")
|
||
|
|
||
|
self.do_test("test_pickle")
|
||
|
|
||
|
def test_monkeypatch(self):
|
||
|
self.do_test("test_monkeypatch")
|
||
|
|
||
|
def test_open(self):
|
||
|
self.do_test("test_open", os_helper.TESTFN)
|
||
|
|
||
|
def test_cantrace(self):
|
||
|
self.do_test("test_cantrace")
|
||
|
|
||
|
def test_mmap(self):
|
||
|
self.do_test("test_mmap")
|
||
|
|
||
|
def test_excepthook(self):
|
||
|
returncode, events, stderr = self.run_python("test_excepthook")
|
||
|
if not returncode:
|
||
|
self.fail(f"Expected fatal exception\n{stderr}")
|
||
|
|
||
|
self.assertSequenceEqual(
|
||
|
[("sys.excepthook", " ", "RuntimeError('fatal-error')")], events
|
||
|
)
|
||
|
|
||
|
def test_unraisablehook(self):
|
||
|
returncode, events, stderr = self.run_python("test_unraisablehook")
|
||
|
if returncode:
|
||
|
self.fail(stderr)
|
||
|
|
||
|
self.assertEqual(events[0][0], "sys.unraisablehook")
|
||
|
self.assertEqual(
|
||
|
events[0][2],
|
||
|
"RuntimeError('nonfatal-error') Exception ignored for audit hook test",
|
||
|
)
|
||
|
|
||
|
def test_winreg(self):
|
||
|
import_helper.import_module("winreg")
|
||
|
returncode, events, stderr = self.run_python("test_winreg")
|
||
|
if returncode:
|
||
|
self.fail(stderr)
|
||
|
|
||
|
self.assertEqual(events[0][0], "winreg.OpenKey")
|
||
|
self.assertEqual(events[1][0], "winreg.OpenKey/result")
|
||
|
expected = events[1][2]
|
||
|
self.assertTrue(expected)
|
||
|
self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 0"], events[2])
|
||
|
self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 10000"], events[3])
|
||
|
self.assertSequenceEqual(["winreg.PyHKEY.Detach", " ", expected], events[4])
|
||
|
|
||
|
def test_socket(self):
|
||
|
import_helper.import_module("socket")
|
||
|
returncode, events, stderr = self.run_python("test_socket")
|
||
|
if returncode:
|
||
|
self.fail(stderr)
|
||
|
|
||
|
if support.verbose:
|
||
|
print(*events, sep='\n')
|
||
|
self.assertEqual(events[0][0], "socket.gethostname")
|
||
|
self.assertEqual(events[1][0], "socket.__new__")
|
||
|
self.assertEqual(events[2][0], "socket.bind")
|
||
|
self.assertTrue(events[2][2].endswith("('127.0.0.1', 8080)"))
|
||
|
|
||
|
def test_gc(self):
|
||
|
returncode, events, stderr = self.run_python("test_gc")
|
||
|
if returncode:
|
||
|
self.fail(stderr)
|
||
|
|
||
|
if support.verbose:
|
||
|
print(*events, sep='\n')
|
||
|
self.assertEqual(
|
||
|
[event[0] for event in events],
|
||
|
["gc.get_objects", "gc.get_referrers", "gc.get_referents"]
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_http(self):
|
||
|
import_helper.import_module("http.client")
|
||
|
returncode, events, stderr = self.run_python("test_http_client")
|
||
|
if returncode:
|
||
|
self.fail(stderr)
|
||
|
|
||
|
if support.verbose:
|
||
|
print(*events, sep='\n')
|
||
|
self.assertEqual(events[0][0], "http.client.connect")
|
||
|
self.assertEqual(events[0][2], "www.python.org 80")
|
||
|
self.assertEqual(events[1][0], "http.client.send")
|
||
|
if events[1][2] != '[cannot send]':
|
||
|
self.assertIn('HTTP', events[1][2])
|
||
|
|
||
|
|
||
|
def test_sqlite3(self):
|
||
|
try:
|
||
|
import sqlite3
|
||
|
except ImportError:
|
||
|
return
|
||
|
returncode, events, stderr = self.run_python("test_sqlite3")
|
||
|
if returncode:
|
||
|
self.fail(stderr)
|
||
|
|
||
|
if support.verbose:
|
||
|
print(*events, sep='\n')
|
||
|
actual = [ev[0] for ev in events]
|
||
|
expected = ["sqlite3.connect", "sqlite3.connect/handle"] * 2
|
||
|
|
||
|
if hasattr(sqlite3.Connection, "enable_load_extension"):
|
||
|
expected += [
|
||
|
"sqlite3.enable_load_extension",
|
||
|
"sqlite3.load_extension",
|
||
|
]
|
||
|
self.assertEqual(actual, expected)
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
unittest.main()
|