"""Unit tests for WMI modules Some tests are optional, since they rely on remote machines and usernames / passwords. To enable these, copy wmitest.master.ini to wmitest.ini and set the parameters you have available. The watcher tests spawn temporary processes and temporary logical drives. These may get left behind. """ # # TODO: # - Test for negative timezone # - Test for share name with embedded single quote # import os, sys import datetime try: import ConfigParser except ImportError: import configparser as ConfigParser import operator try: import Queue except ImportError: import queue as Queue try: next except NameError: def next (iterator): return iterator.next () import subprocess import tempfile import threading import time import unittest import warnings import pythoncom import win32api import win32con import win32file import wmi ini = ConfigParser.SafeConfigParser () ini.read (["wmitest.master.ini", "wmitest.ini"]) settings = {} if ini.has_section ("settings"): settings.update (ini.items ("settings")) excludes = [i.strip () for i in settings.get ("excludes", "").split (",")] COMPUTERS = [None, "."] if "machine" in settings: COMPUTERS.append (settings['machine']) IMPERSONATION_LEVELS = [None, "identify", "impersonate", "delegate"] AUTHENTICATION_LEVELS = [None, "default", "none", "connect", "call", "pkt", "pktintegrity", "pktprivacy"] AUTHORITIES = [None] if set (["domain", "machine"]) <= set (settings): #~ AUTHORITIES.append ("kerberos:%s" % settings['domain']) AUTHORITIES.append ("ntlmdomain:%s" % settings['domain']) PRIVILEGES = [None, ['security', '!shutdown']] NAMESPACES = [None, "root/cimv2", "default"] class TestBasicConnections (unittest.TestCase): def test_basic_connection (self): "Check that a standard connection works" self.assert_ (wmi.WMI ()) def test_remote_connection (self): "Check that a remote connection works, if specified" if "machine" in settings: self.assert_ (wmi.WMI (settings['machine'])) else: warnings.warn ("Skipping test_remote_connection") def test_simple_moniker (self): "Check that a simple moniker works" self.assert_ (wmi.WMI (moniker="winmgmts:")) def test_moniker_with_class (self): "Check that specifying a class in moniker works" c0 = wmi.WMI ().Win32_ComputerSystem c1 = wmi.WMI (moniker="winmgmts:Win32_ComputerSystem") self.assert_ (c0 == c1) def test_moniker_with_instance (self): "Check that specifying an instance in the moniker works" for c0 in wmi.WMI ().Win32_ComputerSystem (): break c1 = wmi.WMI (moniker='winmgmts:Win32_ComputerSystem.Name="%s"' % c0.Name) self.assert_ (c0 == c1) def test_impersonation_levels (self): "Check that specifying an impersonation level works" for impersonation in IMPERSONATION_LEVELS: self.assert_ (wmi.WMI (impersonation_level=impersonation)) def test_authentication_levels (self): "Check that specifying an authentication level works" for authentication in AUTHENTICATION_LEVELS: try: c = wmi.WMI (authentication_level=authentication) except wmi.x_access_denied: warnings.warn ("Access denied for authentication level %s" % authentication) else: self.assert_ (c) def test_authority (self): "Check that specifying an authority works" for authority in AUTHORITIES: self.assert_ (wmi.WMI (authority=authority)) def test_privileges (self): "Check that specifying privileges works" for privileges in PRIVILEGES: self.assert_ (wmi.WMI (privileges=privileges)) def test_namespace (self): "Check that specifying a namespace works" for namespace in NAMESPACES: self.assert_ (wmi.WMI (namespace=namespace)) def test_suffix (self): "Check that a suffix returns the class of that name" self.assert_ (wmi.WMI (namespace="DEFAULT", suffix="StdRegProv") == wmi.WMI (namespace="DEFAULT").StdRegProv) def test_user_password (self): "Check that username & password are passed through for a remote connection" if set (["machine", "user", "password"]) <= set (settings): self.assert_ (wmi.WMI (computer=settings['machine'], user=settings['user'], password=settings['password'])) else: warnings.warn ("Skipping test_user_password because no machine, user or password") def test_too_much_authentication (self): "Check that user/password plus privs / suffix raises exception" self.assertRaises (wmi.x_wmi_authentication, wmi.WMI, computer='***', user="***", password="***", privileges=["***"]) self.assertRaises (wmi.x_wmi_authentication, wmi.WMI, computer='***', user="***", password="***", suffix="***") def test_user_password_with_impersonation_level (self): "Check that an impersonation level works with a username / password" if not (set (["machine", "user", "password"]) <= set (settings)): warnings.warn ("Skipping test_user_password_with_impersonation_level because no machine, user or password") else: self.assert_ ( wmi.WMI ( computer=settings['machine'], user=settings['user'], password=settings['password'], impersonation_level="impersonate" ) ) def test_user_password_with_invalid_impersonation_level (self): "Check that an impersonation level works with a username / password" if not (set (["machine", "user", "password"]) <= set (settings)): warnings.warn ("Skipping test_user_password_with_invalid_impersonation_level because no machine, user or password") else: self.assertRaises ( wmi.x_wmi_authentication, wmi.WMI, computer=settings['machine'], user=settings['user'], password=settings['password'], impersonation_level="***" ) def test_user_password_with_authentication_level (self): "Check that an invalid impersonation level raises x_wmi_authentication" if not (set (["machine", "user", "password"]) <= set (settings)): warnings.warn ("Skipping test_user_password_with_authentication_level because no machine, user or password") else: self.assert_ ( wmi.WMI ( computer=settings['machine'], user=settings['user'], password=settings['password'], authentication_level="pktIntegrity" ) ) def test_user_password_with_invalid_authentication_level (self): "Check that an invalid authentication level raises x_wmi_authentication" if not (set (["machine", "user", "password"]) <= set (settings)): warnings.warn ("Skipping test_user_password_with_invalid_authentication_level because no machine, user or password") else: self.assertRaises ( wmi.x_wmi_authentication, wmi.WMI, computer=settings['machine'], user=settings['user'], password=settings['password'], authentication_level="***" ) def test_local_user_password (self): "Check that user/password for local connection raises exception" self.assertRaises (wmi.x_wmi_authentication, wmi.WMI, user="***", password="***") def test_find_classes (self): "Check ability to switch class scan on and off" self.assert_ (wmi.WMI (find_classes=True)._classes) self.assertFalse (wmi.WMI (find_classes=False)._classes) def test_find_classes_false (self): "By default, don't scan for classes but load them on demand" self.assertFalse (wmi.WMI ()._classes) self.assert_ (wmi.WMI ().classes) def test_classes_acts_as_list (self): self.assert_ (wmi.WMI ().classes.index) def test_classes_acts_as_dict (self): self.assert_ (wmi.WMI ().classes.keys) class TestThreadedConnection (unittest.TestCase): def test_initialised_thread (self): """A WMI connection in a thread which has been initialised for COM should succeed. """ def f (q): pythoncom.CoInitialize () try: try: wmi.WMI () except: q.put (False) else: q.put (True) finally: pythoncom.CoUninitialize () q = Queue.Queue () threading.Thread (target=f, args=(q,)).start () self.assert_ (q.get ()) def test_uninitialised_thread (self): """A WMI connection in a thread which has not been initialised for COM should fail with a wmi-specific exception. """ def f (q): try: wmi.WMI () except wmi.x_wmi_uninitialised_thread: q.put (True) except: q.put (False) else: q.put (False) q = Queue.Queue () threading.Thread (target=f, args=(q,)).start () self.assert_ (q.get ()) class TestMoniker (unittest.TestCase): def test_moniker (self): """Look at all possible options for moniker construction and pass them through to a WMI connector """ for computer in COMPUTERS: if computer in (None, "."): local_authorities = [None] else: local_authorities = AUTHORITIES for impersonation_level in IMPERSONATION_LEVELS: for authentication_level in AUTHENTICATION_LEVELS: for authority in local_authorities: for privileges in PRIVILEGES: for namespace in NAMESPACES: moniker = wmi.construct_moniker ( computer=computer, impersonation_level=impersonation_level, authority=authority, privileges=privileges, namespace=namespace ) self.assert_ (wmi.WMI (moniker=moniker), "Moniker failed: %s" % moniker) def test_moniker_root_namespace (self): "Check that namespace is prefixed by root if needed" self.assertEquals (wmi.construct_moniker (namespace="default"), "winmgmts:root/default") self.assertEquals (wmi.construct_moniker (namespace="root/default"), "winmgmts:root/default") class TestFunctions (unittest.TestCase): times = [ ((2000, 1, 1), "20000101******.******+***"), ((2000, 1, 1, 10, 0, 0), "20000101100000.******+***"), ((2000, 1, 1, 10, 0, 0, 100), "20000101100000.000100+***"), ((2000, 1, 1, 10, 0, 0, 100, "GMT"), "20000101100000.000100+GMT") ] def test_signed_to_unsigned (self): tests = [ (0, 0), (-1, 0xffffffff), (+1, 1), (0x7fffffff, 0x7fffffff), (-0x7fffffff, 0x80000001) ] for signed, unsigned in tests: self.assertEquals (wmi.signed_to_unsigned (signed), unsigned) def test_from_1601 (self): "Check conversion from 100-ns intervals since 1601 (!)" self.assertEquals (wmi.from_1601 (0), datetime.datetime (1601, 1, 1)) self.assertEquals (wmi.from_1601 (24 * 60 * 60 * 10 * 1000 * 1000), datetime.datetime (1601, 1, 2)) def test_from_time (self): "Check conversion from time-tuple to time-string" for t, s in self.times: self.assertEquals (wmi.from_time (*t), s) def test_to_time (self): "Check conversion from time-string to time-tuple" for t, s in self.times: t = tuple (list (t) + ([None] * 8))[:8] self.assertEquals (wmi.to_time (s), t) def test_get_wmi_type (self): "Check that namespace, class & instance are identified correctly" self.assertEquals (wmi.get_wmi_type (wmi.WMI ()), "namespace") self.assertEquals (wmi.get_wmi_type (wmi.WMI ().Win32_ComputerSystem), "class") for i in wmi.WMI ().Win32_ComputerSystem (): self.assertEquals (wmi.get_wmi_type (i), "instance") def test_registry (self): """Convenience Registry function is identical to picking the StdRegProv class out of the DEFAULT namespace""" self.assertEquals (wmi.Registry (), wmi.WMI (namespace="DEFAULT").StdRegProv) class TestWMI (unittest.TestCase): def setUp (self): self.connection = wmi.WMI (namespace="root/cimv2", find_classes=False) self.logical_disks = set (self.connection.Win32_LogicalDisk ()) class TestNamespace (TestWMI): def test_subclasses_of_simple (self): self.assert_ ("Win32_ComputerSystem" in self.connection.subclasses_of ()) def test_subclasses_of_subtree (self): self.assert_ ("Win32_Desktop" in self.connection.subclasses_of ("CIM_Setting")) def test_subclasses_of_pattern (self): self.assert_ (set (["Win32_LogicalDisk", "Win32_MappedLogicalDisk"]) <= set (self.connection.subclasses_of ("CIM_LogicalDevice", "Win32_.*Disk"))) def test_instances (self): self.assertEquals (self.logical_disks, set (self.connection.instances ("Win32_LogicalDisk"))) def test_new (self): "Check this is an alias for the new method of the equivalent class" self.assertEquals (self.connection.new ("Win32_Process")._instance_of, self.connection.Win32_Process) def test_query (self): self.assertEquals (self.logical_disks, set (self.connection.query ("SELECT * FROM Win32_LogicalDisk"))) def test_ipython_attributes_with_find_classes (self): connection = wmi.WMI (find_classes=True) self.assertEquals (sorted (connection._getAttributeNames ()), sorted (i for i in connection.classes if not i.startswith ("__"))) def test_getattr (self): "Check that WMI classes are returned by attribute access on their namespace" connection = wmi.WMI (find_classes=True) for c in list (connection.classes)[:5]: wmi_class = getattr (connection, c) self.assert_ (isinstance (wmi_class, wmi._wmi_class)) self.assertEquals (wmi_class._class_name, c) def test_watch_for (self): """Check that the watch_for method returns a watcher. The watcher itself will be tested elsewhere. """ watcher = self.connection.watch_for ( wmi_class="Win32_Process" ) self.assert_ (isinstance (watcher, wmi._wmi_watcher)) class TestClass (TestWMI): def test_class_from_namespace (self): self.assert_ (self.connection.Win32_ComputerSystem._namespace is self.connection) def test_class_without_namespace (self): wmi_class = wmi.GetObject ("winmgmts:Win32_ComputerSystem") self.assert_ (wmi._wmi_class (None, wmi_class)._namespace) def test_query (self): self.assertEquals ( set (self.connection.Win32_ComputerSystem.query ()), set (self.connection.query ("SELECT * FROM Win32_ComputerSystem")) ) def test_query_with_where (self): this_drive = os.getcwd ()[:2] for drive in self.connection.Win32_LogicalDisk (Name=this_drive): self.assertEquals (drive.Name, this_drive) def test_query_with_fields (self): this_drive = os.getcwd ()[:2] properties = set (["MediaType"]) self.assert_ ("Name" not in properties) for drive in self.connection.Win32_LogicalDisk (properties, Name=this_drive): self.assertEquals (set (drive.properties), set (properties)) self.assert_ (drive.MediaType) self.assertRaises (AttributeError, getattr, drive, "Name") def test_watch_for (self): """Check that the watch_for method returns a watcher. The watcher itself will be tested elsewhere. """ watcher = self.connection.Win32_Process.watch_for () self.assert_ (isinstance (watcher, wmi._wmi_watcher)) def test_instances (self): self.assertEquals ( set (self.connection.Win32_LogicalDisk ()), set (self.connection.Win32_LogicalDisk.instances ()) ) def test_new (self): process = self.connection.Win32_Process.new () self.assertEquals (wmi.get_wmi_type (process), "instance") self.assertEquals (process._instance_of, self.connection.Win32_process) class TestWatcher (TestWMI): def new_letter (self): return \ set ("%s:" % chr (i) for i in range (ord ('A'), 1 + ord ('Z'))).\ difference (d.DeviceID for d in self.connection.Win32_LogicalDisk ()).\ pop () @staticmethod def create (new_letter): print "about to create drive with letter", new_letter here = os.path.dirname (os.path.abspath (__file__)) win32file.DefineDosDevice (0, new_letter, here) try: # # This sleep is needed for the WMI pollster to react # time.sleep (2) finally: win32file.DefineDosDevice (2, new_letter, here) def test_creation (self): try: new_letter = self.new_letter () except KeyError: warnings.warn ("Unable to find a spare drive letter to map.") return watcher = self.connection.Win32_LogicalDisk.watch_for ( notification_type="Creation", DeviceID=new_letter ) t = threading.Timer (2, self.create, (new_letter,)) t.start () found_disk = watcher (timeout_ms=20000) self.assert_ (isinstance (found_disk, wmi._wmi_object)) self.assertEqual (found_disk.Caption, new_letter) t.join () def test_event_with_no_params (self): try: new_letter = self.new_letter () except KeyError: warnings.warn ("Unable to find a spare drive letter to map.") return watcher = self.connection.Win32_LogicalDisk.watch_for () t = threading.Timer (2, self.create, (new_letter,)) t.start () found_disk = watcher (timeout_ms=20000) self.assert_ (isinstance (found_disk, wmi._wmi_object)) self.assertEqual (found_disk.Caption, new_letter) t.join () def test_valid_notification_types (self): for notification_type in ['operation', 'modification', 'creation', 'deletion']: self.assert_ (self.connection.Win32_LogicalDisk.watch_for (notification_type=notification_type)) def test_invalid_notification_types (self): self.assertRaises (wmi.x_wmi, self.connection.Win32_LogicalDisk.watch_for, notification_type="***") def do_not_test_extrinsic_event (self): # # This doesn't seem implementable at the moment # as a test. I can't find a reproducible extrinsic # event except for Win32_DeviceChangeEvent and that # one would require someone to, eg, plug in / unplug # a USB stick. # # It looks as though Win32_ProcessStartTrace should work # and it does on my laptop; just not on my desktop. # def _create (queue): queue.put (subprocess.Popen ([sys.executable, "-c", "import time; time.sleep (10)"])) watcher = self.connection.Win32_ProcessStartTrace.watch_for ( fields=["*"]##, #~ ProcessName=os.path.basename (sys.executable) ) q = Queue.Queue () t = threading.Timer (2, _create, (q,)) try: t.start () found_process = watcher (timeout_ms=20000) spawned_process = q.get_nowait () self.assert_ (isinstance (found_process, wmi._wmi_event)) self.assertEqual (int (found_process.ProcessID), spawned_process.pid) finally: t.cancel () class TestMethods (TestWMI): def test_exists (self): "Check that a well-known method is available by attribute" self.assert_ (self.connection.Win32_Process.Create) def test_params (self): "Check that the names and arrayness of params are picked up when not arrays" self.assertEquals ( [(n, False) for n in ["CommandLine", "CurrentDirectory", "ProcessStartupInformation"]], self.connection.Win32_Process.Create.in_parameter_names ) self.assertEquals ( [("ProcessId", False), ("ReturnValue", False)], self.connection.Win32_Process.Create.out_parameter_names ) def test_positional_params (self): dir = tempfile.mkdtemp () filename = "abc.txt" contents = str (datetime.datetime.now ()) handle, result = self.connection.Win32_Process.Create ( "cmd /c echo %s > %s" % (contents, filename), dir, self.connection.Win32_ProcessStartup.new (ShowWindow=0) ) time.sleep (0.5) self.assertEqual (open (os.path.join (dir, filename)).read (), contents + " \n") def test_named_params (self): dir = tempfile.mkdtemp () filename = "abc.txt" contents = str (datetime.datetime.now ()) handle, result = self.connection.Win32_Process.Create ( ProcessStartupInformation=self.connection.Win32_ProcessStartup.new (ShowWindow=0), CurrentDirectory=dir, CommandLine="cmd /c echo %s > %s" % (contents, filename) ) time.sleep (0.5) self.assertEqual (open (os.path.join (dir, filename)).read (), contents + " \n") def test_in_params_with_array (self): "Check that the names and arrayness of params are picked up when arrays" self.assertEquals ( [("DNSServerSearchOrder", True)], self.connection.Win32_NetworkAdapterConfiguration.SetDNSServerSearchOrder.in_parameter_names ) def test_instance_methods_are_distinct (self): """Check that the methods of difference instances of a class are distinct. This caused a problem when calling .Terminate on one process killed another. """ methods = [d.Reset for d in self.logical_disks] for i in range (len (methods)-1): self.assertNotEqual (methods[i], methods[i+1]) def test_call_from_class (self): "Check that a method can be called from a class" self.assert_ (self.connection.Win32_Process.Create ( CommandLine=sys.executable + " -c pass", ProcessStartupInformation=self.connection.Win32_ProcessStartup.new (ShowWindow=0) )) def test_call_from_instance (self): "Check that a method can be called from an instance" handle, _ = self.connection.Win32_Process.Create ( CommandLine=sys.executable, ProcessStartupInformation=self.connection.Win32_ProcessStartup.new (ShowWindow=0) ) result = 1 for p in self.connection.Win32_Process (Handle=handle): result, = p.Terminate () self.assertEqual (result, 0) class TestProperties (TestWMI): def test_access (self): "Check that all properties are available as attributes" for d in self.logical_disks: break for p in d.ole_object.Properties_: self.assertEqual (p.Value, getattr (d, p.Name)) def test_attribute_passthrough (self): "Check that unknown attributes are passed through to the underlying object" for d in self.logical_disks: break # # Can't rely on the COM Objects testing identical or equal; # have to check their values and their emptiness. # self.assert_ (d.Properties_) self.assert_ (d.ole_object.Properties_) self.assertEqual ( [p.Value for p in d.Properties_], [p.Value for p in d.ole_object.Properties_] ) def test_settable (self): "Check that a writeable property can be written" name = str (time.time ()).split (".")[0] old_value = "***" new_value = "!!!" username = win32api.GetUserNameEx (win32con.NameSamCompatible) self.assert_ (not self.connection.Win32_Environment (Name=name, UserName=username)) self.connection.Win32_Environment.new (Name=name, UserName=username, VariableValue=old_value).put () for envvar in self.connection.Win32_Environment (Name=name, UserName=username): self.assertEqual (envvar.VariableValue, old_value) envvar.VariableValue = new_value try: for envvar in self.connection.Win32_Environment (Name=name, UserName=username): self.assertEqual (envvar.VariableValue, new_value) finally: for envvar in self.connection.Win32_Environment (Name=name, UserName=username): envvar.VariableValue = None class TestInstances (TestWMI): def test_hashable (self): "Ensure instances are hashable so can be used in a set/dict" self.assert_ (dict.fromkeys (self.logical_disks)) def test_equalable (self): "Ensure instances compare equal" self.assertEqual (self.logical_disks, self.logical_disks) def test_sortable (self): "Ensure instances sort by full path/key" self.assertEqual ( sorted (self.logical_disks), sorted (self.logical_disks, key=operator.attrgetter ("DeviceID")) ) def test_references (self): "Ensure that associations are special-cased to return wrapped objects" for d in self.logical_disks: break for r in d.references ("Win32_LogicalDiskRootDirectory"): self.assert_ (r.is_association) self.assertEqual (r.GroupComponent, d) self.assert_ (isinstance (r.GroupComponent, wmi._wmi_object)) self.assert_ (isinstance (r.PartComponent, wmi._wmi_object)) def test_associators (self): "Ensure that associators are returned by association / result" for d in self.logical_disks: if d.DeviceID == os.path.abspath (__file__)[:2]: break else: raise RuntimeError ("Unable to find the logical drive corresponding to this file") root_dir = d.associators (wmi_association_class="Win32_LogicalDiskRootDirectory")[0] self.assertEqual (root_dir.Name.lower (), d.Name.lower () + "\\".lower ()) root_dir = d.associators (wmi_result_class="Win32_Directory")[0] self.assertEqual (root_dir.Name.lower (), d.Name.lower () + "\\") def test_derivation (self): "Check that derivation mimics WMI-provided Derivation_ property" for d in self.logical_disks: break self.assertEqual (d.derivation (), d.ole_object.Derivation_) def test_keys (self): "Check that the readonly keys property returns the keys for an object" self.assertEqual (self.connection.Win32_LogicalDisk.keys, ['DeviceID']) self.assertEqual (next (iter (self.logical_disks)).keys, ['DeviceID']) class TestInstanceCreation (TestWMI): def test_create_instance (self): self.assert_ (isinstance (self.connection.Win32_ProcessStartup.new (ShowWindow=2), wmi._wmi_object)) class TestAssociations (TestWMI): def test_all_properties_available (self): # # An association can contain not only the associated # classes but also extra information as well. Ensure # that both types of data are correctly handled. # for q in self.connection.Win32_DiskQuota (): for p in q.properties: try: getattr (q, p) except wmi.x_wmi: assert False, "Error getting %s from %s" % (p, q) else: assert True if __name__ == '__main__': unittest.main ()