You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ORPA-pyOpenRPA/WPy32-3720/python-3.7.2/Scripts/wmitest.py

715 lines
25 KiB

"""Unit tests for WMI modules
Some tests are optional, since they rely on remote machines and
usernames / passwords. To enable these, copy wmitest.master.ini
to wmitest.ini and set the parameters you have available.
The watcher tests spawn temporary processes and temporary
logical drives. These may get left behind.
"""
#
# TODO:
# - Test for negative timezone
# - Test for share name with embedded single quote
#
import os, sys
import datetime
try:
import ConfigParser
except ImportError:
import configparser as ConfigParser
import operator
try:
import Queue
except ImportError:
import queue as Queue
try:
next
except NameError:
def next (iterator): return iterator.next ()
import subprocess
import tempfile
import threading
import time
import unittest
import warnings
import pythoncom
import win32api
import win32con
import win32file
import wmi
ini = ConfigParser.SafeConfigParser ()
ini.read (["wmitest.master.ini", "wmitest.ini"])
settings = {}
if ini.has_section ("settings"):
settings.update (ini.items ("settings"))
excludes = [i.strip () for i in settings.get ("excludes", "").split (",")]
COMPUTERS = [None, "."]
if "machine" in settings:
COMPUTERS.append (settings['machine'])
IMPERSONATION_LEVELS = [None, "identify", "impersonate", "delegate"]
AUTHENTICATION_LEVELS = [None, "default", "none", "connect", "call", "pkt", "pktintegrity", "pktprivacy"]
AUTHORITIES = [None]
if set (["domain", "machine"]) <= set (settings):
#~ AUTHORITIES.append ("kerberos:%s" % settings['domain'])
AUTHORITIES.append ("ntlmdomain:%s" % settings['domain'])
PRIVILEGES = [None, ['security', '!shutdown']]
NAMESPACES = [None, "root/cimv2", "default"]
class TestBasicConnections (unittest.TestCase):
def test_basic_connection (self):
"Check that a standard connection works"
self.assert_ (wmi.WMI ())
def test_remote_connection (self):
"Check that a remote connection works, if specified"
if "machine" in settings:
self.assert_ (wmi.WMI (settings['machine']))
else:
warnings.warn ("Skipping test_remote_connection")
def test_simple_moniker (self):
"Check that a simple moniker works"
self.assert_ (wmi.WMI (moniker="winmgmts:"))
def test_moniker_with_class (self):
"Check that specifying a class in moniker works"
c0 = wmi.WMI ().Win32_ComputerSystem
c1 = wmi.WMI (moniker="winmgmts:Win32_ComputerSystem")
self.assert_ (c0 == c1)
def test_moniker_with_instance (self):
"Check that specifying an instance in the moniker works"
for c0 in wmi.WMI ().Win32_ComputerSystem ():
break
c1 = wmi.WMI (moniker='winmgmts:Win32_ComputerSystem.Name="%s"' % c0.Name)
self.assert_ (c0 == c1)
def test_impersonation_levels (self):
"Check that specifying an impersonation level works"
for impersonation in IMPERSONATION_LEVELS:
self.assert_ (wmi.WMI (impersonation_level=impersonation))
def test_authentication_levels (self):
"Check that specifying an authentication level works"
for authentication in AUTHENTICATION_LEVELS:
try:
c = wmi.WMI (authentication_level=authentication)
except wmi.x_access_denied:
warnings.warn ("Access denied for authentication level %s" % authentication)
else:
self.assert_ (c)
def test_authority (self):
"Check that specifying an authority works"
for authority in AUTHORITIES:
self.assert_ (wmi.WMI (authority=authority))
def test_privileges (self):
"Check that specifying privileges works"
for privileges in PRIVILEGES:
self.assert_ (wmi.WMI (privileges=privileges))
def test_namespace (self):
"Check that specifying a namespace works"
for namespace in NAMESPACES:
self.assert_ (wmi.WMI (namespace=namespace))
def test_suffix (self):
"Check that a suffix returns the class of that name"
self.assert_ (wmi.WMI (namespace="DEFAULT", suffix="StdRegProv") == wmi.WMI (namespace="DEFAULT").StdRegProv)
def test_user_password (self):
"Check that username & password are passed through for a remote connection"
if set (["machine", "user", "password"]) <= set (settings):
self.assert_ (wmi.WMI (computer=settings['machine'], user=settings['user'], password=settings['password']))
else:
warnings.warn ("Skipping test_user_password because no machine, user or password")
def test_too_much_authentication (self):
"Check that user/password plus privs / suffix raises exception"
self.assertRaises (wmi.x_wmi_authentication, wmi.WMI, computer='***', user="***", password="***", privileges=["***"])
self.assertRaises (wmi.x_wmi_authentication, wmi.WMI, computer='***', user="***", password="***", suffix="***")
def test_user_password_with_impersonation_level (self):
"Check that an impersonation level works with a username / password"
if not (set (["machine", "user", "password"]) <= set (settings)):
warnings.warn ("Skipping test_user_password_with_impersonation_level because no machine, user or password")
else:
self.assert_ (
wmi.WMI (
computer=settings['machine'],
user=settings['user'],
password=settings['password'],
impersonation_level="impersonate"
)
)
def test_user_password_with_invalid_impersonation_level (self):
"Check that an impersonation level works with a username / password"
if not (set (["machine", "user", "password"]) <= set (settings)):
warnings.warn ("Skipping test_user_password_with_invalid_impersonation_level because no machine, user or password")
else:
self.assertRaises (
wmi.x_wmi_authentication,
wmi.WMI,
computer=settings['machine'],
user=settings['user'],
password=settings['password'],
impersonation_level="***"
)
def test_user_password_with_authentication_level (self):
"Check that an invalid impersonation level raises x_wmi_authentication"
if not (set (["machine", "user", "password"]) <= set (settings)):
warnings.warn ("Skipping test_user_password_with_authentication_level because no machine, user or password")
else:
self.assert_ (
wmi.WMI (
computer=settings['machine'],
user=settings['user'],
password=settings['password'],
authentication_level="pktIntegrity"
)
)
def test_user_password_with_invalid_authentication_level (self):
"Check that an invalid authentication level raises x_wmi_authentication"
if not (set (["machine", "user", "password"]) <= set (settings)):
warnings.warn ("Skipping test_user_password_with_invalid_authentication_level because no machine, user or password")
else:
self.assertRaises (
wmi.x_wmi_authentication,
wmi.WMI,
computer=settings['machine'],
user=settings['user'],
password=settings['password'],
authentication_level="***"
)
def test_local_user_password (self):
"Check that user/password for local connection raises exception"
self.assertRaises (wmi.x_wmi_authentication, wmi.WMI, user="***", password="***")
def test_find_classes (self):
"Check ability to switch class scan on and off"
self.assert_ (wmi.WMI (find_classes=True)._classes)
self.assertFalse (wmi.WMI (find_classes=False)._classes)
def test_find_classes_false (self):
"By default, don't scan for classes but load them on demand"
self.assertFalse (wmi.WMI ()._classes)
self.assert_ (wmi.WMI ().classes)
def test_classes_acts_as_list (self):
self.assert_ (wmi.WMI ().classes.index)
def test_classes_acts_as_dict (self):
self.assert_ (wmi.WMI ().classes.keys)
class TestThreadedConnection (unittest.TestCase):
def test_initialised_thread (self):
"""A WMI connection in a thread which has been initialised for COM
should succeed.
"""
def f (q):
pythoncom.CoInitialize ()
try:
try:
wmi.WMI ()
except:
q.put (False)
else:
q.put (True)
finally:
pythoncom.CoUninitialize ()
q = Queue.Queue ()
threading.Thread (target=f, args=(q,)).start ()
self.assert_ (q.get ())
def test_uninitialised_thread (self):
"""A WMI connection in a thread which has not been initialised
for COM should fail with a wmi-specific exception.
"""
def f (q):
try:
wmi.WMI ()
except wmi.x_wmi_uninitialised_thread:
q.put (True)
except:
q.put (False)
else:
q.put (False)
q = Queue.Queue ()
threading.Thread (target=f, args=(q,)).start ()
self.assert_ (q.get ())
class TestMoniker (unittest.TestCase):
def test_moniker (self):
"""Look at all possible options for moniker construction and pass
them through to a WMI connector
"""
for computer in COMPUTERS:
if computer in (None, "."):
local_authorities = [None]
else:
local_authorities = AUTHORITIES
for impersonation_level in IMPERSONATION_LEVELS:
for authentication_level in AUTHENTICATION_LEVELS:
for authority in local_authorities:
for privileges in PRIVILEGES:
for namespace in NAMESPACES:
moniker = wmi.construct_moniker (
computer=computer,
impersonation_level=impersonation_level,
authority=authority,
privileges=privileges,
namespace=namespace
)
self.assert_ (wmi.WMI (moniker=moniker), "Moniker failed: %s" % moniker)
def test_moniker_root_namespace (self):
"Check that namespace is prefixed by root if needed"
self.assertEquals (wmi.construct_moniker (namespace="default"), "winmgmts:root/default")
self.assertEquals (wmi.construct_moniker (namespace="root/default"), "winmgmts:root/default")
class TestFunctions (unittest.TestCase):
times = [
((2000, 1, 1), "20000101******.******+***"),
((2000, 1, 1, 10, 0, 0), "20000101100000.******+***"),
((2000, 1, 1, 10, 0, 0, 100), "20000101100000.000100+***"),
((2000, 1, 1, 10, 0, 0, 100, "GMT"), "20000101100000.000100+GMT")
]
def test_signed_to_unsigned (self):
tests = [
(0, 0),
(-1, 0xffffffff),
(+1, 1),
(0x7fffffff, 0x7fffffff),
(-0x7fffffff, 0x80000001)
]
for signed, unsigned in tests:
self.assertEquals (wmi.signed_to_unsigned (signed), unsigned)
def test_from_1601 (self):
"Check conversion from 100-ns intervals since 1601 (!)"
self.assertEquals (wmi.from_1601 (0), datetime.datetime (1601, 1, 1))
self.assertEquals (wmi.from_1601 (24 * 60 * 60 * 10 * 1000 * 1000), datetime.datetime (1601, 1, 2))
def test_from_time (self):
"Check conversion from time-tuple to time-string"
for t, s in self.times:
self.assertEquals (wmi.from_time (*t), s)
def test_to_time (self):
"Check conversion from time-string to time-tuple"
for t, s in self.times:
t = tuple (list (t) + ([None] * 8))[:8]
self.assertEquals (wmi.to_time (s), t)
def test_get_wmi_type (self):
"Check that namespace, class & instance are identified correctly"
self.assertEquals (wmi.get_wmi_type (wmi.WMI ()), "namespace")
self.assertEquals (wmi.get_wmi_type (wmi.WMI ().Win32_ComputerSystem), "class")
for i in wmi.WMI ().Win32_ComputerSystem ():
self.assertEquals (wmi.get_wmi_type (i), "instance")
def test_registry (self):
"""Convenience Registry function is identical to picking
the StdRegProv class out of the DEFAULT namespace"""
self.assertEquals (wmi.Registry (), wmi.WMI (namespace="DEFAULT").StdRegProv)
class TestWMI (unittest.TestCase):
def setUp (self):
self.connection = wmi.WMI (namespace="root/cimv2", find_classes=False)
self.logical_disks = set (self.connection.Win32_LogicalDisk ())
class TestNamespace (TestWMI):
def test_subclasses_of_simple (self):
self.assert_ ("Win32_ComputerSystem" in self.connection.subclasses_of ())
def test_subclasses_of_subtree (self):
self.assert_ ("Win32_Desktop" in self.connection.subclasses_of ("CIM_Setting"))
def test_subclasses_of_pattern (self):
self.assert_ (set (["Win32_LogicalDisk", "Win32_MappedLogicalDisk"]) <= set (self.connection.subclasses_of ("CIM_LogicalDevice", "Win32_.*Disk")))
def test_instances (self):
self.assertEquals (self.logical_disks, set (self.connection.instances ("Win32_LogicalDisk")))
def test_new (self):
"Check this is an alias for the new method of the equivalent class"
self.assertEquals (self.connection.new ("Win32_Process")._instance_of, self.connection.Win32_Process)
def test_query (self):
self.assertEquals (self.logical_disks, set (self.connection.query ("SELECT * FROM Win32_LogicalDisk")))
def test_ipython_attributes_with_find_classes (self):
connection = wmi.WMI (find_classes=True)
self.assertEquals (sorted (connection._getAttributeNames ()), sorted (i for i in connection.classes if not i.startswith ("__")))
def test_getattr (self):
"Check that WMI classes are returned by attribute access on their namespace"
connection = wmi.WMI (find_classes=True)
for c in list (connection.classes)[:5]:
wmi_class = getattr (connection, c)
self.assert_ (isinstance (wmi_class, wmi._wmi_class))
self.assertEquals (wmi_class._class_name, c)
def test_watch_for (self):
"""Check that the watch_for method returns a watcher. The watcher itself
will be tested elsewhere.
"""
watcher = self.connection.watch_for (
wmi_class="Win32_Process"
)
self.assert_ (isinstance (watcher, wmi._wmi_watcher))
class TestClass (TestWMI):
def test_class_from_namespace (self):
self.assert_ (self.connection.Win32_ComputerSystem._namespace is self.connection)
def test_class_without_namespace (self):
wmi_class = wmi.GetObject ("winmgmts:Win32_ComputerSystem")
self.assert_ (wmi._wmi_class (None, wmi_class)._namespace)
def test_query (self):
self.assertEquals (
set (self.connection.Win32_ComputerSystem.query ()),
set (self.connection.query ("SELECT * FROM Win32_ComputerSystem"))
)
def test_query_with_where (self):
this_drive = os.getcwd ()[:2]
for drive in self.connection.Win32_LogicalDisk (Name=this_drive):
self.assertEquals (drive.Name, this_drive)
def test_query_with_fields (self):
this_drive = os.getcwd ()[:2]
properties = set (["MediaType"])
self.assert_ ("Name" not in properties)
for drive in self.connection.Win32_LogicalDisk (properties, Name=this_drive):
self.assertEquals (set (drive.properties), set (properties))
self.assert_ (drive.MediaType)
self.assertRaises (AttributeError, getattr, drive, "Name")
def test_watch_for (self):
"""Check that the watch_for method returns a watcher. The watcher itself
will be tested elsewhere.
"""
watcher = self.connection.Win32_Process.watch_for ()
self.assert_ (isinstance (watcher, wmi._wmi_watcher))
def test_instances (self):
self.assertEquals (
set (self.connection.Win32_LogicalDisk ()),
set (self.connection.Win32_LogicalDisk.instances ())
)
def test_new (self):
process = self.connection.Win32_Process.new ()
self.assertEquals (wmi.get_wmi_type (process), "instance")
self.assertEquals (process._instance_of, self.connection.Win32_process)
class TestWatcher (TestWMI):
def new_letter (self):
return \
set ("%s:" % chr (i) for i in range (ord ('A'), 1 + ord ('Z'))).\
difference (d.DeviceID for d in self.connection.Win32_LogicalDisk ()).\
pop ()
@staticmethod
def create (new_letter):
print "about to create drive with letter", new_letter
here = os.path.dirname (os.path.abspath (__file__))
win32file.DefineDosDevice (0, new_letter, here)
try:
#
# This sleep is needed for the WMI pollster to react
#
time.sleep (2)
finally:
win32file.DefineDosDevice (2, new_letter, here)
def test_creation (self):
try:
new_letter = self.new_letter ()
except KeyError:
warnings.warn ("Unable to find a spare drive letter to map.")
return
watcher = self.connection.Win32_LogicalDisk.watch_for (
notification_type="Creation",
DeviceID=new_letter
)
t = threading.Timer (2, self.create, (new_letter,))
t.start ()
found_disk = watcher (timeout_ms=20000)
self.assert_ (isinstance (found_disk, wmi._wmi_object))
self.assertEqual (found_disk.Caption, new_letter)
t.join ()
def test_event_with_no_params (self):
try:
new_letter = self.new_letter ()
except KeyError:
warnings.warn ("Unable to find a spare drive letter to map.")
return
watcher = self.connection.Win32_LogicalDisk.watch_for ()
t = threading.Timer (2, self.create, (new_letter,))
t.start ()
found_disk = watcher (timeout_ms=20000)
self.assert_ (isinstance (found_disk, wmi._wmi_object))
self.assertEqual (found_disk.Caption, new_letter)
t.join ()
def test_valid_notification_types (self):
for notification_type in ['operation', 'modification', 'creation', 'deletion']:
self.assert_ (self.connection.Win32_LogicalDisk.watch_for (notification_type=notification_type))
def test_invalid_notification_types (self):
self.assertRaises (wmi.x_wmi, self.connection.Win32_LogicalDisk.watch_for, notification_type="***")
def do_not_test_extrinsic_event (self):
#
# This doesn't seem implementable at the moment
# as a test. I can't find a reproducible extrinsic
# event except for Win32_DeviceChangeEvent and that
# one would require someone to, eg, plug in / unplug
# a USB stick.
#
# It looks as though Win32_ProcessStartTrace should work
# and it does on my laptop; just not on my desktop.
#
def _create (queue):
queue.put (subprocess.Popen ([sys.executable, "-c", "import time; time.sleep (10)"]))
watcher = self.connection.Win32_ProcessStartTrace.watch_for (
fields=["*"]##,
#~ ProcessName=os.path.basename (sys.executable)
)
q = Queue.Queue ()
t = threading.Timer (2, _create, (q,))
try:
t.start ()
found_process = watcher (timeout_ms=20000)
spawned_process = q.get_nowait ()
self.assert_ (isinstance (found_process, wmi._wmi_event))
self.assertEqual (int (found_process.ProcessID), spawned_process.pid)
finally:
t.cancel ()
class TestMethods (TestWMI):
def test_exists (self):
"Check that a well-known method is available by attribute"
self.assert_ (self.connection.Win32_Process.Create)
def test_params (self):
"Check that the names and arrayness of params are picked up when not arrays"
self.assertEquals (
[(n, False) for n in ["CommandLine", "CurrentDirectory", "ProcessStartupInformation"]],
self.connection.Win32_Process.Create.in_parameter_names
)
self.assertEquals (
[("ProcessId", False), ("ReturnValue", False)],
self.connection.Win32_Process.Create.out_parameter_names
)
def test_positional_params (self):
dir = tempfile.mkdtemp ()
filename = "abc.txt"
contents = str (datetime.datetime.now ())
handle, result = self.connection.Win32_Process.Create (
"cmd /c echo %s > %s" % (contents, filename),
dir,
self.connection.Win32_ProcessStartup.new (ShowWindow=0)
)
time.sleep (0.5)
self.assertEqual (open (os.path.join (dir, filename)).read (), contents + " \n")
def test_named_params (self):
dir = tempfile.mkdtemp ()
filename = "abc.txt"
contents = str (datetime.datetime.now ())
handle, result = self.connection.Win32_Process.Create (
ProcessStartupInformation=self.connection.Win32_ProcessStartup.new (ShowWindow=0),
CurrentDirectory=dir,
CommandLine="cmd /c echo %s > %s" % (contents, filename)
)
time.sleep (0.5)
self.assertEqual (open (os.path.join (dir, filename)).read (), contents + " \n")
def test_in_params_with_array (self):
"Check that the names and arrayness of params are picked up when arrays"
self.assertEquals (
[("DNSServerSearchOrder", True)],
self.connection.Win32_NetworkAdapterConfiguration.SetDNSServerSearchOrder.in_parameter_names
)
def test_instance_methods_are_distinct (self):
"""Check that the methods of difference instances of a class are distinct.
This caused a problem when calling .Terminate on one process killed another.
"""
methods = [d.Reset for d in self.logical_disks]
for i in range (len (methods)-1):
self.assertNotEqual (methods[i], methods[i+1])
def test_call_from_class (self):
"Check that a method can be called from a class"
self.assert_ (self.connection.Win32_Process.Create (
CommandLine=sys.executable + " -c pass",
ProcessStartupInformation=self.connection.Win32_ProcessStartup.new (ShowWindow=0)
))
def test_call_from_instance (self):
"Check that a method can be called from an instance"
handle, _ = self.connection.Win32_Process.Create (
CommandLine=sys.executable,
ProcessStartupInformation=self.connection.Win32_ProcessStartup.new (ShowWindow=0)
)
result = 1
for p in self.connection.Win32_Process (Handle=handle):
result, = p.Terminate ()
self.assertEqual (result, 0)
class TestProperties (TestWMI):
def test_access (self):
"Check that all properties are available as attributes"
for d in self.logical_disks:
break
for p in d.ole_object.Properties_:
self.assertEqual (p.Value, getattr (d, p.Name))
def test_attribute_passthrough (self):
"Check that unknown attributes are passed through to the underlying object"
for d in self.logical_disks:
break
#
# Can't rely on the COM Objects testing identical or equal;
# have to check their values and their emptiness.
#
self.assert_ (d.Properties_)
self.assert_ (d.ole_object.Properties_)
self.assertEqual (
[p.Value for p in d.Properties_],
[p.Value for p in d.ole_object.Properties_]
)
def test_settable (self):
"Check that a writeable property can be written"
name = str (time.time ()).split (".")[0]
old_value = "***"
new_value = "!!!"
username = win32api.GetUserNameEx (win32con.NameSamCompatible)
self.assert_ (not self.connection.Win32_Environment (Name=name, UserName=username))
self.connection.Win32_Environment.new (Name=name, UserName=username, VariableValue=old_value).put ()
for envvar in self.connection.Win32_Environment (Name=name, UserName=username):
self.assertEqual (envvar.VariableValue, old_value)
envvar.VariableValue = new_value
try:
for envvar in self.connection.Win32_Environment (Name=name, UserName=username):
self.assertEqual (envvar.VariableValue, new_value)
finally:
for envvar in self.connection.Win32_Environment (Name=name, UserName=username):
envvar.VariableValue = None
class TestInstances (TestWMI):
def test_hashable (self):
"Ensure instances are hashable so can be used in a set/dict"
self.assert_ (dict.fromkeys (self.logical_disks))
def test_equalable (self):
"Ensure instances compare equal"
self.assertEqual (self.logical_disks, self.logical_disks)
def test_sortable (self):
"Ensure instances sort by full path/key"
self.assertEqual (
sorted (self.logical_disks),
sorted (self.logical_disks, key=operator.attrgetter ("DeviceID"))
)
def test_references (self):
"Ensure that associations are special-cased to return wrapped objects"
for d in self.logical_disks:
break
for r in d.references ("Win32_LogicalDiskRootDirectory"):
self.assert_ (r.is_association)
self.assertEqual (r.GroupComponent, d)
self.assert_ (isinstance (r.GroupComponent, wmi._wmi_object))
self.assert_ (isinstance (r.PartComponent, wmi._wmi_object))
def test_associators (self):
"Ensure that associators are returned by association / result"
for d in self.logical_disks:
if d.DeviceID == os.path.abspath (__file__)[:2]:
break
else:
raise RuntimeError ("Unable to find the logical drive corresponding to this file")
root_dir = d.associators (wmi_association_class="Win32_LogicalDiskRootDirectory")[0]
self.assertEqual (root_dir.Name.lower (), d.Name.lower () + "\\".lower ())
root_dir = d.associators (wmi_result_class="Win32_Directory")[0]
self.assertEqual (root_dir.Name.lower (), d.Name.lower () + "\\")
def test_derivation (self):
"Check that derivation mimics WMI-provided Derivation_ property"
for d in self.logical_disks:
break
self.assertEqual (d.derivation (), d.ole_object.Derivation_)
def test_keys (self):
"Check that the readonly keys property returns the keys for an object"
self.assertEqual (self.connection.Win32_LogicalDisk.keys, ['DeviceID'])
self.assertEqual (next (iter (self.logical_disks)).keys, ['DeviceID'])
class TestInstanceCreation (TestWMI):
def test_create_instance (self):
self.assert_ (isinstance (self.connection.Win32_ProcessStartup.new (ShowWindow=2), wmi._wmi_object))
class TestAssociations (TestWMI):
def test_all_properties_available (self):
#
# An association can contain not only the associated
# classes but also extra information as well. Ensure
# that both types of data are correctly handled.
#
for q in self.connection.Win32_DiskQuota ():
for p in q.properties:
try:
getattr (q, p)
except wmi.x_wmi:
assert False, "Error getting %s from %s" % (p, q)
else:
assert True
if __name__ == '__main__':
unittest.main ()