You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
715 lines
25 KiB
715 lines
25 KiB
"""Unit tests for WMI modules
|
|
|
|
Some tests are optional, since they rely on remote machines and
|
|
usernames / passwords. To enable these, copy wmitest.master.ini
|
|
to wmitest.ini and set the parameters you have available.
|
|
|
|
The watcher tests spawn temporary processes and temporary
|
|
logical drives. These may get left behind.
|
|
"""
|
|
|
|
#
|
|
# TODO:
|
|
# - Test for negative timezone
|
|
# - Test for share name with embedded single quote
|
|
#
|
|
|
|
import os, sys
|
|
import datetime
|
|
try:
|
|
import ConfigParser
|
|
except ImportError:
|
|
import configparser as ConfigParser
|
|
import operator
|
|
try:
|
|
import Queue
|
|
except ImportError:
|
|
import queue as Queue
|
|
try:
|
|
next
|
|
except NameError:
|
|
def next (iterator): return iterator.next ()
|
|
import subprocess
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import unittest
|
|
import warnings
|
|
|
|
import pythoncom
|
|
import win32api
|
|
import win32con
|
|
import win32file
|
|
|
|
import wmi
|
|
|
|
ini = ConfigParser.SafeConfigParser ()
|
|
ini.read (["wmitest.master.ini", "wmitest.ini"])
|
|
settings = {}
|
|
if ini.has_section ("settings"):
|
|
settings.update (ini.items ("settings"))
|
|
excludes = [i.strip () for i in settings.get ("excludes", "").split (",")]
|
|
|
|
COMPUTERS = [None, "."]
|
|
if "machine" in settings:
|
|
COMPUTERS.append (settings['machine'])
|
|
IMPERSONATION_LEVELS = [None, "identify", "impersonate", "delegate"]
|
|
AUTHENTICATION_LEVELS = [None, "default", "none", "connect", "call", "pkt", "pktintegrity", "pktprivacy"]
|
|
AUTHORITIES = [None]
|
|
if set (["domain", "machine"]) <= set (settings):
|
|
#~ AUTHORITIES.append ("kerberos:%s" % settings['domain'])
|
|
AUTHORITIES.append ("ntlmdomain:%s" % settings['domain'])
|
|
PRIVILEGES = [None, ['security', '!shutdown']]
|
|
NAMESPACES = [None, "root/cimv2", "default"]
|
|
|
|
class TestBasicConnections (unittest.TestCase):
|
|
|
|
def test_basic_connection (self):
|
|
"Check that a standard connection works"
|
|
self.assert_ (wmi.WMI ())
|
|
|
|
def test_remote_connection (self):
|
|
"Check that a remote connection works, if specified"
|
|
if "machine" in settings:
|
|
self.assert_ (wmi.WMI (settings['machine']))
|
|
else:
|
|
warnings.warn ("Skipping test_remote_connection")
|
|
|
|
def test_simple_moniker (self):
|
|
"Check that a simple moniker works"
|
|
self.assert_ (wmi.WMI (moniker="winmgmts:"))
|
|
|
|
def test_moniker_with_class (self):
|
|
"Check that specifying a class in moniker works"
|
|
c0 = wmi.WMI ().Win32_ComputerSystem
|
|
c1 = wmi.WMI (moniker="winmgmts:Win32_ComputerSystem")
|
|
self.assert_ (c0 == c1)
|
|
|
|
def test_moniker_with_instance (self):
|
|
"Check that specifying an instance in the moniker works"
|
|
for c0 in wmi.WMI ().Win32_ComputerSystem ():
|
|
break
|
|
c1 = wmi.WMI (moniker='winmgmts:Win32_ComputerSystem.Name="%s"' % c0.Name)
|
|
self.assert_ (c0 == c1)
|
|
|
|
def test_impersonation_levels (self):
|
|
"Check that specifying an impersonation level works"
|
|
for impersonation in IMPERSONATION_LEVELS:
|
|
self.assert_ (wmi.WMI (impersonation_level=impersonation))
|
|
|
|
def test_authentication_levels (self):
|
|
"Check that specifying an authentication level works"
|
|
for authentication in AUTHENTICATION_LEVELS:
|
|
try:
|
|
c = wmi.WMI (authentication_level=authentication)
|
|
except wmi.x_access_denied:
|
|
warnings.warn ("Access denied for authentication level %s" % authentication)
|
|
else:
|
|
self.assert_ (c)
|
|
|
|
def test_authority (self):
|
|
"Check that specifying an authority works"
|
|
for authority in AUTHORITIES:
|
|
self.assert_ (wmi.WMI (authority=authority))
|
|
|
|
def test_privileges (self):
|
|
"Check that specifying privileges works"
|
|
for privileges in PRIVILEGES:
|
|
self.assert_ (wmi.WMI (privileges=privileges))
|
|
|
|
def test_namespace (self):
|
|
"Check that specifying a namespace works"
|
|
for namespace in NAMESPACES:
|
|
self.assert_ (wmi.WMI (namespace=namespace))
|
|
|
|
def test_suffix (self):
|
|
"Check that a suffix returns the class of that name"
|
|
self.assert_ (wmi.WMI (namespace="DEFAULT", suffix="StdRegProv") == wmi.WMI (namespace="DEFAULT").StdRegProv)
|
|
|
|
def test_user_password (self):
|
|
"Check that username & password are passed through for a remote connection"
|
|
if set (["machine", "user", "password"]) <= set (settings):
|
|
self.assert_ (wmi.WMI (computer=settings['machine'], user=settings['user'], password=settings['password']))
|
|
else:
|
|
warnings.warn ("Skipping test_user_password because no machine, user or password")
|
|
|
|
def test_too_much_authentication (self):
|
|
"Check that user/password plus privs / suffix raises exception"
|
|
self.assertRaises (wmi.x_wmi_authentication, wmi.WMI, computer='***', user="***", password="***", privileges=["***"])
|
|
self.assertRaises (wmi.x_wmi_authentication, wmi.WMI, computer='***', user="***", password="***", suffix="***")
|
|
|
|
def test_user_password_with_impersonation_level (self):
|
|
"Check that an impersonation level works with a username / password"
|
|
if not (set (["machine", "user", "password"]) <= set (settings)):
|
|
warnings.warn ("Skipping test_user_password_with_impersonation_level because no machine, user or password")
|
|
else:
|
|
self.assert_ (
|
|
wmi.WMI (
|
|
computer=settings['machine'],
|
|
user=settings['user'],
|
|
password=settings['password'],
|
|
impersonation_level="impersonate"
|
|
)
|
|
)
|
|
|
|
def test_user_password_with_invalid_impersonation_level (self):
|
|
"Check that an impersonation level works with a username / password"
|
|
if not (set (["machine", "user", "password"]) <= set (settings)):
|
|
warnings.warn ("Skipping test_user_password_with_invalid_impersonation_level because no machine, user or password")
|
|
else:
|
|
self.assertRaises (
|
|
wmi.x_wmi_authentication,
|
|
wmi.WMI,
|
|
computer=settings['machine'],
|
|
user=settings['user'],
|
|
password=settings['password'],
|
|
impersonation_level="***"
|
|
)
|
|
|
|
def test_user_password_with_authentication_level (self):
|
|
"Check that an invalid impersonation level raises x_wmi_authentication"
|
|
if not (set (["machine", "user", "password"]) <= set (settings)):
|
|
warnings.warn ("Skipping test_user_password_with_authentication_level because no machine, user or password")
|
|
else:
|
|
self.assert_ (
|
|
wmi.WMI (
|
|
computer=settings['machine'],
|
|
user=settings['user'],
|
|
password=settings['password'],
|
|
authentication_level="pktIntegrity"
|
|
)
|
|
)
|
|
|
|
def test_user_password_with_invalid_authentication_level (self):
|
|
"Check that an invalid authentication level raises x_wmi_authentication"
|
|
if not (set (["machine", "user", "password"]) <= set (settings)):
|
|
warnings.warn ("Skipping test_user_password_with_invalid_authentication_level because no machine, user or password")
|
|
else:
|
|
self.assertRaises (
|
|
wmi.x_wmi_authentication,
|
|
wmi.WMI,
|
|
computer=settings['machine'],
|
|
user=settings['user'],
|
|
password=settings['password'],
|
|
authentication_level="***"
|
|
)
|
|
|
|
def test_local_user_password (self):
|
|
"Check that user/password for local connection raises exception"
|
|
self.assertRaises (wmi.x_wmi_authentication, wmi.WMI, user="***", password="***")
|
|
|
|
def test_find_classes (self):
|
|
"Check ability to switch class scan on and off"
|
|
self.assert_ (wmi.WMI (find_classes=True)._classes)
|
|
self.assertFalse (wmi.WMI (find_classes=False)._classes)
|
|
|
|
def test_find_classes_false (self):
|
|
"By default, don't scan for classes but load them on demand"
|
|
self.assertFalse (wmi.WMI ()._classes)
|
|
self.assert_ (wmi.WMI ().classes)
|
|
|
|
def test_classes_acts_as_list (self):
|
|
self.assert_ (wmi.WMI ().classes.index)
|
|
|
|
def test_classes_acts_as_dict (self):
|
|
self.assert_ (wmi.WMI ().classes.keys)
|
|
|
|
class TestThreadedConnection (unittest.TestCase):
|
|
|
|
|
|
def test_initialised_thread (self):
|
|
"""A WMI connection in a thread which has been initialised for COM
|
|
should succeed.
|
|
"""
|
|
def f (q):
|
|
pythoncom.CoInitialize ()
|
|
try:
|
|
try:
|
|
wmi.WMI ()
|
|
except:
|
|
q.put (False)
|
|
else:
|
|
q.put (True)
|
|
finally:
|
|
pythoncom.CoUninitialize ()
|
|
|
|
q = Queue.Queue ()
|
|
threading.Thread (target=f, args=(q,)).start ()
|
|
self.assert_ (q.get ())
|
|
|
|
def test_uninitialised_thread (self):
|
|
"""A WMI connection in a thread which has not been initialised
|
|
for COM should fail with a wmi-specific exception.
|
|
"""
|
|
def f (q):
|
|
try:
|
|
wmi.WMI ()
|
|
except wmi.x_wmi_uninitialised_thread:
|
|
q.put (True)
|
|
except:
|
|
q.put (False)
|
|
else:
|
|
q.put (False)
|
|
|
|
q = Queue.Queue ()
|
|
threading.Thread (target=f, args=(q,)).start ()
|
|
self.assert_ (q.get ())
|
|
|
|
class TestMoniker (unittest.TestCase):
|
|
|
|
def test_moniker (self):
|
|
"""Look at all possible options for moniker construction and pass
|
|
them through to a WMI connector
|
|
"""
|
|
for computer in COMPUTERS:
|
|
if computer in (None, "."):
|
|
local_authorities = [None]
|
|
else:
|
|
local_authorities = AUTHORITIES
|
|
for impersonation_level in IMPERSONATION_LEVELS:
|
|
for authentication_level in AUTHENTICATION_LEVELS:
|
|
for authority in local_authorities:
|
|
for privileges in PRIVILEGES:
|
|
for namespace in NAMESPACES:
|
|
moniker = wmi.construct_moniker (
|
|
computer=computer,
|
|
impersonation_level=impersonation_level,
|
|
authority=authority,
|
|
privileges=privileges,
|
|
namespace=namespace
|
|
)
|
|
self.assert_ (wmi.WMI (moniker=moniker), "Moniker failed: %s" % moniker)
|
|
|
|
def test_moniker_root_namespace (self):
|
|
"Check that namespace is prefixed by root if needed"
|
|
self.assertEquals (wmi.construct_moniker (namespace="default"), "winmgmts:root/default")
|
|
self.assertEquals (wmi.construct_moniker (namespace="root/default"), "winmgmts:root/default")
|
|
|
|
class TestFunctions (unittest.TestCase):
|
|
|
|
times = [
|
|
((2000, 1, 1), "20000101******.******+***"),
|
|
((2000, 1, 1, 10, 0, 0), "20000101100000.******+***"),
|
|
((2000, 1, 1, 10, 0, 0, 100), "20000101100000.000100+***"),
|
|
((2000, 1, 1, 10, 0, 0, 100, "GMT"), "20000101100000.000100+GMT")
|
|
]
|
|
|
|
def test_signed_to_unsigned (self):
|
|
tests = [
|
|
(0, 0),
|
|
(-1, 0xffffffff),
|
|
(+1, 1),
|
|
(0x7fffffff, 0x7fffffff),
|
|
(-0x7fffffff, 0x80000001)
|
|
]
|
|
for signed, unsigned in tests:
|
|
self.assertEquals (wmi.signed_to_unsigned (signed), unsigned)
|
|
|
|
def test_from_1601 (self):
|
|
"Check conversion from 100-ns intervals since 1601 (!)"
|
|
self.assertEquals (wmi.from_1601 (0), datetime.datetime (1601, 1, 1))
|
|
self.assertEquals (wmi.from_1601 (24 * 60 * 60 * 10 * 1000 * 1000), datetime.datetime (1601, 1, 2))
|
|
|
|
def test_from_time (self):
|
|
"Check conversion from time-tuple to time-string"
|
|
for t, s in self.times:
|
|
self.assertEquals (wmi.from_time (*t), s)
|
|
|
|
def test_to_time (self):
|
|
"Check conversion from time-string to time-tuple"
|
|
for t, s in self.times:
|
|
t = tuple (list (t) + ([None] * 8))[:8]
|
|
self.assertEquals (wmi.to_time (s), t)
|
|
|
|
def test_get_wmi_type (self):
|
|
"Check that namespace, class & instance are identified correctly"
|
|
self.assertEquals (wmi.get_wmi_type (wmi.WMI ()), "namespace")
|
|
self.assertEquals (wmi.get_wmi_type (wmi.WMI ().Win32_ComputerSystem), "class")
|
|
for i in wmi.WMI ().Win32_ComputerSystem ():
|
|
self.assertEquals (wmi.get_wmi_type (i), "instance")
|
|
|
|
def test_registry (self):
|
|
"""Convenience Registry function is identical to picking
|
|
the StdRegProv class out of the DEFAULT namespace"""
|
|
self.assertEquals (wmi.Registry (), wmi.WMI (namespace="DEFAULT").StdRegProv)
|
|
|
|
class TestWMI (unittest.TestCase):
|
|
|
|
def setUp (self):
|
|
self.connection = wmi.WMI (namespace="root/cimv2", find_classes=False)
|
|
self.logical_disks = set (self.connection.Win32_LogicalDisk ())
|
|
|
|
class TestNamespace (TestWMI):
|
|
|
|
def test_subclasses_of_simple (self):
|
|
self.assert_ ("Win32_ComputerSystem" in self.connection.subclasses_of ())
|
|
|
|
def test_subclasses_of_subtree (self):
|
|
self.assert_ ("Win32_Desktop" in self.connection.subclasses_of ("CIM_Setting"))
|
|
|
|
def test_subclasses_of_pattern (self):
|
|
self.assert_ (set (["Win32_LogicalDisk", "Win32_MappedLogicalDisk"]) <= set (self.connection.subclasses_of ("CIM_LogicalDevice", "Win32_.*Disk")))
|
|
|
|
def test_instances (self):
|
|
self.assertEquals (self.logical_disks, set (self.connection.instances ("Win32_LogicalDisk")))
|
|
|
|
def test_new (self):
|
|
"Check this is an alias for the new method of the equivalent class"
|
|
self.assertEquals (self.connection.new ("Win32_Process")._instance_of, self.connection.Win32_Process)
|
|
|
|
def test_query (self):
|
|
self.assertEquals (self.logical_disks, set (self.connection.query ("SELECT * FROM Win32_LogicalDisk")))
|
|
|
|
def test_ipython_attributes_with_find_classes (self):
|
|
connection = wmi.WMI (find_classes=True)
|
|
self.assertEquals (sorted (connection._getAttributeNames ()), sorted (i for i in connection.classes if not i.startswith ("__")))
|
|
|
|
def test_getattr (self):
|
|
"Check that WMI classes are returned by attribute access on their namespace"
|
|
connection = wmi.WMI (find_classes=True)
|
|
for c in list (connection.classes)[:5]:
|
|
wmi_class = getattr (connection, c)
|
|
self.assert_ (isinstance (wmi_class, wmi._wmi_class))
|
|
self.assertEquals (wmi_class._class_name, c)
|
|
|
|
def test_watch_for (self):
|
|
"""Check that the watch_for method returns a watcher. The watcher itself
|
|
will be tested elsewhere.
|
|
"""
|
|
watcher = self.connection.watch_for (
|
|
wmi_class="Win32_Process"
|
|
)
|
|
self.assert_ (isinstance (watcher, wmi._wmi_watcher))
|
|
|
|
class TestClass (TestWMI):
|
|
|
|
def test_class_from_namespace (self):
|
|
self.assert_ (self.connection.Win32_ComputerSystem._namespace is self.connection)
|
|
|
|
def test_class_without_namespace (self):
|
|
wmi_class = wmi.GetObject ("winmgmts:Win32_ComputerSystem")
|
|
self.assert_ (wmi._wmi_class (None, wmi_class)._namespace)
|
|
|
|
def test_query (self):
|
|
self.assertEquals (
|
|
set (self.connection.Win32_ComputerSystem.query ()),
|
|
set (self.connection.query ("SELECT * FROM Win32_ComputerSystem"))
|
|
)
|
|
|
|
def test_query_with_where (self):
|
|
this_drive = os.getcwd ()[:2]
|
|
for drive in self.connection.Win32_LogicalDisk (Name=this_drive):
|
|
self.assertEquals (drive.Name, this_drive)
|
|
|
|
def test_query_with_fields (self):
|
|
this_drive = os.getcwd ()[:2]
|
|
properties = set (["MediaType"])
|
|
self.assert_ ("Name" not in properties)
|
|
for drive in self.connection.Win32_LogicalDisk (properties, Name=this_drive):
|
|
self.assertEquals (set (drive.properties), set (properties))
|
|
self.assert_ (drive.MediaType)
|
|
self.assertRaises (AttributeError, getattr, drive, "Name")
|
|
|
|
def test_watch_for (self):
|
|
"""Check that the watch_for method returns a watcher. The watcher itself
|
|
will be tested elsewhere.
|
|
"""
|
|
watcher = self.connection.Win32_Process.watch_for ()
|
|
self.assert_ (isinstance (watcher, wmi._wmi_watcher))
|
|
|
|
def test_instances (self):
|
|
self.assertEquals (
|
|
set (self.connection.Win32_LogicalDisk ()),
|
|
set (self.connection.Win32_LogicalDisk.instances ())
|
|
)
|
|
|
|
def test_new (self):
|
|
process = self.connection.Win32_Process.new ()
|
|
self.assertEquals (wmi.get_wmi_type (process), "instance")
|
|
self.assertEquals (process._instance_of, self.connection.Win32_process)
|
|
|
|
|
|
class TestWatcher (TestWMI):
|
|
|
|
def new_letter (self):
|
|
return \
|
|
set ("%s:" % chr (i) for i in range (ord ('A'), 1 + ord ('Z'))).\
|
|
difference (d.DeviceID for d in self.connection.Win32_LogicalDisk ()).\
|
|
pop ()
|
|
|
|
@staticmethod
|
|
def create (new_letter):
|
|
print "about to create drive with letter", new_letter
|
|
here = os.path.dirname (os.path.abspath (__file__))
|
|
win32file.DefineDosDevice (0, new_letter, here)
|
|
try:
|
|
#
|
|
# This sleep is needed for the WMI pollster to react
|
|
#
|
|
time.sleep (2)
|
|
finally:
|
|
win32file.DefineDosDevice (2, new_letter, here)
|
|
|
|
def test_creation (self):
|
|
try:
|
|
new_letter = self.new_letter ()
|
|
except KeyError:
|
|
warnings.warn ("Unable to find a spare drive letter to map.")
|
|
return
|
|
|
|
watcher = self.connection.Win32_LogicalDisk.watch_for (
|
|
notification_type="Creation",
|
|
DeviceID=new_letter
|
|
)
|
|
t = threading.Timer (2, self.create, (new_letter,))
|
|
t.start ()
|
|
found_disk = watcher (timeout_ms=20000)
|
|
self.assert_ (isinstance (found_disk, wmi._wmi_object))
|
|
self.assertEqual (found_disk.Caption, new_letter)
|
|
t.join ()
|
|
|
|
def test_event_with_no_params (self):
|
|
try:
|
|
new_letter = self.new_letter ()
|
|
except KeyError:
|
|
warnings.warn ("Unable to find a spare drive letter to map.")
|
|
return
|
|
|
|
watcher = self.connection.Win32_LogicalDisk.watch_for ()
|
|
t = threading.Timer (2, self.create, (new_letter,))
|
|
t.start ()
|
|
found_disk = watcher (timeout_ms=20000)
|
|
self.assert_ (isinstance (found_disk, wmi._wmi_object))
|
|
self.assertEqual (found_disk.Caption, new_letter)
|
|
t.join ()
|
|
|
|
def test_valid_notification_types (self):
|
|
for notification_type in ['operation', 'modification', 'creation', 'deletion']:
|
|
self.assert_ (self.connection.Win32_LogicalDisk.watch_for (notification_type=notification_type))
|
|
|
|
def test_invalid_notification_types (self):
|
|
self.assertRaises (wmi.x_wmi, self.connection.Win32_LogicalDisk.watch_for, notification_type="***")
|
|
|
|
|
|
def do_not_test_extrinsic_event (self):
|
|
|
|
#
|
|
# This doesn't seem implementable at the moment
|
|
# as a test. I can't find a reproducible extrinsic
|
|
# event except for Win32_DeviceChangeEvent and that
|
|
# one would require someone to, eg, plug in / unplug
|
|
# a USB stick.
|
|
#
|
|
# It looks as though Win32_ProcessStartTrace should work
|
|
# and it does on my laptop; just not on my desktop.
|
|
#
|
|
|
|
def _create (queue):
|
|
queue.put (subprocess.Popen ([sys.executable, "-c", "import time; time.sleep (10)"]))
|
|
|
|
watcher = self.connection.Win32_ProcessStartTrace.watch_for (
|
|
fields=["*"]##,
|
|
#~ ProcessName=os.path.basename (sys.executable)
|
|
)
|
|
q = Queue.Queue ()
|
|
t = threading.Timer (2, _create, (q,))
|
|
try:
|
|
t.start ()
|
|
found_process = watcher (timeout_ms=20000)
|
|
spawned_process = q.get_nowait ()
|
|
self.assert_ (isinstance (found_process, wmi._wmi_event))
|
|
self.assertEqual (int (found_process.ProcessID), spawned_process.pid)
|
|
finally:
|
|
t.cancel ()
|
|
|
|
class TestMethods (TestWMI):
|
|
|
|
def test_exists (self):
|
|
"Check that a well-known method is available by attribute"
|
|
self.assert_ (self.connection.Win32_Process.Create)
|
|
|
|
def test_params (self):
|
|
"Check that the names and arrayness of params are picked up when not arrays"
|
|
self.assertEquals (
|
|
[(n, False) for n in ["CommandLine", "CurrentDirectory", "ProcessStartupInformation"]],
|
|
self.connection.Win32_Process.Create.in_parameter_names
|
|
)
|
|
self.assertEquals (
|
|
[("ProcessId", False), ("ReturnValue", False)],
|
|
self.connection.Win32_Process.Create.out_parameter_names
|
|
)
|
|
|
|
def test_positional_params (self):
|
|
dir = tempfile.mkdtemp ()
|
|
filename = "abc.txt"
|
|
contents = str (datetime.datetime.now ())
|
|
handle, result = self.connection.Win32_Process.Create (
|
|
"cmd /c echo %s > %s" % (contents, filename),
|
|
dir,
|
|
self.connection.Win32_ProcessStartup.new (ShowWindow=0)
|
|
)
|
|
time.sleep (0.5)
|
|
self.assertEqual (open (os.path.join (dir, filename)).read (), contents + " \n")
|
|
|
|
def test_named_params (self):
|
|
dir = tempfile.mkdtemp ()
|
|
filename = "abc.txt"
|
|
contents = str (datetime.datetime.now ())
|
|
handle, result = self.connection.Win32_Process.Create (
|
|
ProcessStartupInformation=self.connection.Win32_ProcessStartup.new (ShowWindow=0),
|
|
CurrentDirectory=dir,
|
|
CommandLine="cmd /c echo %s > %s" % (contents, filename)
|
|
)
|
|
time.sleep (0.5)
|
|
self.assertEqual (open (os.path.join (dir, filename)).read (), contents + " \n")
|
|
|
|
def test_in_params_with_array (self):
|
|
"Check that the names and arrayness of params are picked up when arrays"
|
|
self.assertEquals (
|
|
[("DNSServerSearchOrder", True)],
|
|
self.connection.Win32_NetworkAdapterConfiguration.SetDNSServerSearchOrder.in_parameter_names
|
|
)
|
|
|
|
def test_instance_methods_are_distinct (self):
|
|
"""Check that the methods of difference instances of a class are distinct.
|
|
This caused a problem when calling .Terminate on one process killed another.
|
|
"""
|
|
methods = [d.Reset for d in self.logical_disks]
|
|
for i in range (len (methods)-1):
|
|
self.assertNotEqual (methods[i], methods[i+1])
|
|
|
|
def test_call_from_class (self):
|
|
"Check that a method can be called from a class"
|
|
self.assert_ (self.connection.Win32_Process.Create (
|
|
CommandLine=sys.executable + " -c pass",
|
|
ProcessStartupInformation=self.connection.Win32_ProcessStartup.new (ShowWindow=0)
|
|
))
|
|
|
|
def test_call_from_instance (self):
|
|
"Check that a method can be called from an instance"
|
|
handle, _ = self.connection.Win32_Process.Create (
|
|
CommandLine=sys.executable,
|
|
ProcessStartupInformation=self.connection.Win32_ProcessStartup.new (ShowWindow=0)
|
|
)
|
|
result = 1
|
|
for p in self.connection.Win32_Process (Handle=handle):
|
|
result, = p.Terminate ()
|
|
self.assertEqual (result, 0)
|
|
|
|
class TestProperties (TestWMI):
|
|
|
|
def test_access (self):
|
|
"Check that all properties are available as attributes"
|
|
for d in self.logical_disks:
|
|
break
|
|
for p in d.ole_object.Properties_:
|
|
self.assertEqual (p.Value, getattr (d, p.Name))
|
|
|
|
def test_attribute_passthrough (self):
|
|
"Check that unknown attributes are passed through to the underlying object"
|
|
for d in self.logical_disks:
|
|
break
|
|
#
|
|
# Can't rely on the COM Objects testing identical or equal;
|
|
# have to check their values and their emptiness.
|
|
#
|
|
self.assert_ (d.Properties_)
|
|
self.assert_ (d.ole_object.Properties_)
|
|
self.assertEqual (
|
|
[p.Value for p in d.Properties_],
|
|
[p.Value for p in d.ole_object.Properties_]
|
|
)
|
|
|
|
def test_settable (self):
|
|
"Check that a writeable property can be written"
|
|
name = str (time.time ()).split (".")[0]
|
|
old_value = "***"
|
|
new_value = "!!!"
|
|
username = win32api.GetUserNameEx (win32con.NameSamCompatible)
|
|
self.assert_ (not self.connection.Win32_Environment (Name=name, UserName=username))
|
|
self.connection.Win32_Environment.new (Name=name, UserName=username, VariableValue=old_value).put ()
|
|
for envvar in self.connection.Win32_Environment (Name=name, UserName=username):
|
|
self.assertEqual (envvar.VariableValue, old_value)
|
|
envvar.VariableValue = new_value
|
|
try:
|
|
for envvar in self.connection.Win32_Environment (Name=name, UserName=username):
|
|
self.assertEqual (envvar.VariableValue, new_value)
|
|
finally:
|
|
for envvar in self.connection.Win32_Environment (Name=name, UserName=username):
|
|
envvar.VariableValue = None
|
|
|
|
class TestInstances (TestWMI):
|
|
|
|
def test_hashable (self):
|
|
"Ensure instances are hashable so can be used in a set/dict"
|
|
self.assert_ (dict.fromkeys (self.logical_disks))
|
|
|
|
def test_equalable (self):
|
|
"Ensure instances compare equal"
|
|
self.assertEqual (self.logical_disks, self.logical_disks)
|
|
|
|
def test_sortable (self):
|
|
"Ensure instances sort by full path/key"
|
|
self.assertEqual (
|
|
sorted (self.logical_disks),
|
|
sorted (self.logical_disks, key=operator.attrgetter ("DeviceID"))
|
|
)
|
|
|
|
def test_references (self):
|
|
"Ensure that associations are special-cased to return wrapped objects"
|
|
for d in self.logical_disks:
|
|
break
|
|
for r in d.references ("Win32_LogicalDiskRootDirectory"):
|
|
self.assert_ (r.is_association)
|
|
self.assertEqual (r.GroupComponent, d)
|
|
self.assert_ (isinstance (r.GroupComponent, wmi._wmi_object))
|
|
self.assert_ (isinstance (r.PartComponent, wmi._wmi_object))
|
|
|
|
def test_associators (self):
|
|
"Ensure that associators are returned by association / result"
|
|
for d in self.logical_disks:
|
|
if d.DeviceID == os.path.abspath (__file__)[:2]:
|
|
break
|
|
else:
|
|
raise RuntimeError ("Unable to find the logical drive corresponding to this file")
|
|
root_dir = d.associators (wmi_association_class="Win32_LogicalDiskRootDirectory")[0]
|
|
self.assertEqual (root_dir.Name.lower (), d.Name.lower () + "\\".lower ())
|
|
root_dir = d.associators (wmi_result_class="Win32_Directory")[0]
|
|
self.assertEqual (root_dir.Name.lower (), d.Name.lower () + "\\")
|
|
|
|
def test_derivation (self):
|
|
"Check that derivation mimics WMI-provided Derivation_ property"
|
|
for d in self.logical_disks:
|
|
break
|
|
self.assertEqual (d.derivation (), d.ole_object.Derivation_)
|
|
|
|
def test_keys (self):
|
|
"Check that the readonly keys property returns the keys for an object"
|
|
self.assertEqual (self.connection.Win32_LogicalDisk.keys, ['DeviceID'])
|
|
self.assertEqual (next (iter (self.logical_disks)).keys, ['DeviceID'])
|
|
|
|
class TestInstanceCreation (TestWMI):
|
|
|
|
def test_create_instance (self):
|
|
self.assert_ (isinstance (self.connection.Win32_ProcessStartup.new (ShowWindow=2), wmi._wmi_object))
|
|
|
|
class TestAssociations (TestWMI):
|
|
|
|
def test_all_properties_available (self):
|
|
#
|
|
# An association can contain not only the associated
|
|
# classes but also extra information as well. Ensure
|
|
# that both types of data are correctly handled.
|
|
#
|
|
for q in self.connection.Win32_DiskQuota ():
|
|
for p in q.properties:
|
|
try:
|
|
getattr (q, p)
|
|
except wmi.x_wmi:
|
|
assert False, "Error getting %s from %s" % (p, q)
|
|
else:
|
|
assert True
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main ()
|