#PythonX32_add_module_keyboard

dev-linux
Ivan Maslov 6 years ago
parent 39c677c805
commit bbe84e1ef5

@ -3,7 +3,7 @@
Open source RPA platform (Coming soon Q2 2019) for Windows.
Dependencies
* Python 3 x32 [psutil, pywinauto, wmi, PIL, Keyboard, pyautogui, win32api (pywin32)]
* Python 3 x32 [psutil, pywinauto, wmi, PIL, keyboard, pyautogui, win32api (pywin32)]
* Python 3 x64
* pywinauto (Windows GUI automation)
* Semantic UI CSS framework

@ -0,0 +1,126 @@
Metadata-Version: 2.1
Name: keyboard
Version: 0.13.3
Summary: Hook and simulate keyboard events on Windows and Linux
Home-page: https://github.com/boppreh/keyboard
Author: BoppreH
Author-email: boppreh@gmail.com
License: MIT
Keywords: keyboard hook simulate hotkey
Platform: UNKNOWN
Classifier: Development Status :: 4 - Beta
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: Microsoft :: Windows
Classifier: Operating System :: Unix
Classifier: Operating System :: MacOS :: MacOS X
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 3
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Utilities
Requires-Dist: pyobjc; sys_platform == "darwin"
keyboard
========
Take full control of your keyboard with this small Python library. Hook
global events, register hotkeys, simulate key presses and much more.
Features
--------
- **Global event hook** on all keyboards (captures keys regardless of
focus).
- **Listen** and **send** keyboard events.
- Works with **Windows** and **Linux** (requires sudo), with
experimental **OS X** support (thanks @glitchassassin!).
- **Pure Python**, no C modules to be compiled.
- **Zero dependencies**. Trivial to install and deploy, just copy the
files.
- **Python 2 and 3**.
- Complex hotkey support (e.g. ``ctrl+shift+m, ctrl+space``) with
controllable timeout.
- Includes **high level API** (e.g. `record <#keyboard.record>`__ and
`play <#keyboard.play>`__,
`add_abbreviation <#keyboard.add_abbreviation>`__).
- Maps keys as they actually are in your layout, with **full
internationalization support** (e.g. ``Ctrl+ç``).
- Events automatically captured in separate thread, doesnt block main
program.
- Tested and documented.
- Doesnt break accented dead keys (Im looking at you, pyHook).
- Mouse support available via project
`mouse <https://github.com/boppreh/mouse>`__ (``pip install mouse``).
Usage
-----
Install the `PyPI package <https://pypi.python.org/pypi/keyboard/>`__:
::
pip install keyboard
or clone the repository (no installation required, source files are
sufficient):
::
git clone https://github.com/boppreh/keyboard
or `download and extract the
zip <https://github.com/boppreh/keyboard/archive/master.zip>`__ into
your project folder.
Then check the `API docs
below <https://github.com/boppreh/keyboard#api>`__ to see what features
are available.
Example
-------
.. code:: py
import keyboard
keyboard.press_and_release('shift+s, space')
keyboard.write('The quick brown fox jumps over the lazy dog.')
keyboard.add_hotkey('ctrl+shift+a', print, args=('triggered', 'hotkey'))
# Press PAGE UP then PAGE DOWN to type "foobar".
keyboard.add_hotkey('page up, page down', lambda: keyboard.write('foobar'))
# Blocks until you press esc.
keyboard.wait('esc')
# Record events until 'esc' is pressed.
recorded = keyboard.record(until='esc')
# Then replay back at three times the speed.
keyboard.play(recorded, speed_factor=3)
# Type @@ then press space to replace with abbreviation.
keyboard.add_abbreviation('@@', 'my.long.email@example.com')
# Block forever, like `while True`.
keyboard.wait()
Known limitations:
------------------
- Events generated under Windows dont report device id
(``event.device == None``).
`#21 <https://github.com/boppreh/keyboard/issues/21>`__
- Media keys on Linux may appear nameless (scan-code only) or not at
all. `#20 <https://github.com/boppreh/keyboard/issues/20>`__
- Key suppression/blocking only available on Windows.
`#22 <https://github.com/boppreh/keyboard/issues/22>`__
- To avoid depending on X, the Linux parts reads raw device files
(``/dev/input/input*``) but this requries root.
- Other applications, such as some games, may register hooks that
swallow all key events. In this case ``keyboard`` will be unable to
report events.
- This program makes no attempt to hide itself, so dont use it for
keyloggers or online gaming bots. Be responsible.

@ -0,0 +1,39 @@
keyboard-0.13.3.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
keyboard-0.13.3.dist-info/METADATA,sha256=EKtsFJrqlMzkHtc2wfjLNwRb55SwBUpMRApc7JSOGqI,4138
keyboard-0.13.3.dist-info/RECORD,,
keyboard-0.13.3.dist-info/WHEEL,sha256=CG516Me1TEyQqkQMqQpnDadrWi7ujPk88poSAu3OliQ,116
keyboard-0.13.3.dist-info/top_level.txt,sha256=c5wzGrDqCQG1WTTAnxmIHqUWNvjPR9E1TLvrIfpzE-E,9
keyboard/__init__.py,sha256=Lkeg0f2xsfd7T3RfeLL2WTan_8j9opLVu17OQCLt8do,43556
keyboard/__main__.py,sha256=hT5QQiFEKZ7U9009Ox0JMONeON4vzhHwS7X-J5KU38M,378
keyboard/__pycache__/__init__.cpython-37.pyc,,
keyboard/__pycache__/__main__.cpython-37.pyc,,
keyboard/__pycache__/_canonical_names.cpython-37.pyc,,
keyboard/__pycache__/_darwinkeyboard.cpython-37.pyc,,
keyboard/__pycache__/_darwinmouse.cpython-37.pyc,,
keyboard/__pycache__/_generic.cpython-37.pyc,,
keyboard/__pycache__/_keyboard_event.cpython-37.pyc,,
keyboard/__pycache__/_keyboard_tests.cpython-37.pyc,,
keyboard/__pycache__/_mouse_event.cpython-37.pyc,,
keyboard/__pycache__/_mouse_tests.cpython-37.pyc,,
keyboard/__pycache__/_nixcommon.cpython-37.pyc,,
keyboard/__pycache__/_nixkeyboard.cpython-37.pyc,,
keyboard/__pycache__/_nixmouse.cpython-37.pyc,,
keyboard/__pycache__/_winkeyboard.cpython-37.pyc,,
keyboard/__pycache__/_winmouse.cpython-37.pyc,,
keyboard/__pycache__/_xlibkeyboard.cpython-37.pyc,,
keyboard/__pycache__/mouse.cpython-37.pyc,,
keyboard/_canonical_names.py,sha256=_V7K9lmF5z0eMbdMHr85KWoE_SC79zoeYfYB3n914pE,29474
keyboard/_darwinkeyboard.py,sha256=nmDPlMPM2iE5ioZJpZf40GJgdFWta_xKnbwIv3efzAM,18124
keyboard/_darwinmouse.py,sha256=YoD2RmGEnCoptlKn9Tbzg3k9pBVsosetxxQ5Fo3fb8E,6515
keyboard/_generic.py,sha256=94SXyZp6IHAJ0gZJSpBB7hy5kVlYJbtYrsHw71CbOu0,2176
keyboard/_keyboard_event.py,sha256=Spsn28-Tbv50fGB8YBVVvzdaYP3Qd8DDJ_OHrMltskM,1630
keyboard/_keyboard_tests.py,sha256=TKsNk_ZEGGe7RBb48SZp9QG6l9a0M_FeQ_vyrTQ4x9c,36830
keyboard/_mouse_event.py,sha256=zRQGO6M6nbA-jvhI0yz4HzvQNcScF48PZ9iRegcTVjQ,422
keyboard/_mouse_tests.py,sha256=jeXMAm8NCnM1xfoMacfj0Rqves8Zs7mehZHT8c0weaw,10014
keyboard/_nixcommon.py,sha256=WLzMzpb7SB03Wpohzoh4BbLBZZMre_NHKfRrRdjhb_k,6053
keyboard/_nixkeyboard.py,sha256=jBnwq-yVNb9j67P9OpeR7WBt1J6499VW_Uia3FDFRQo,5882
keyboard/_nixmouse.py,sha256=rgcxW1Y0xljbk_Ljtqc08PLe9F9_jCki-oQUz-LvHcI,3518
keyboard/_winkeyboard.py,sha256=vUjNUBuucbHfZtGbxprFv_kC-ekhPk9bwiIGGhfbu74,20607
keyboard/_winmouse.py,sha256=_8t4jDlnVRqeKr0HvtTbvnD8xwElmpGFtqOlZbIAbwM,5817
keyboard/_xlibkeyboard.py,sha256=7-Lbwvx5U2GsOFoDHmeGHrrpAqpVzx51Xnetl9OG3XE,3652
keyboard/mouse.py,sha256=Eq50w6QnyYAGiJ_Y7ERmU-jQlYO7NT0aSFMkR-Fnwnc,7639

@ -0,0 +1,6 @@
Wheel-Version: 1.0
Generator: bdist_wheel (0.31.1)
Root-Is-Purelib: true
Tag: py2-none-any
Tag: py3-none-any

@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
import keyboard
import fileinput
import json
import sys
def print_event_json(event):
print(event.to_json(ensure_ascii=sys.stdout.encoding != 'utf-8'))
sys.stdout.flush()
keyboard.hook(print_event_json)
parse_event_json = lambda line: keyboard.KeyboardEvent(**json.loads(line))
keyboard.play(parse_event_json(line) for line in fileinput.input())

@ -0,0 +1,442 @@
import ctypes
import ctypes.util
import Quartz
import time
import os
import threading
from AppKit import NSEvent
from ._keyboard_event import KeyboardEvent, KEY_DOWN, KEY_UP
from ._canonical_names import normalize_name
try: # Python 2/3 compatibility
unichr
except NameError:
unichr = chr
Carbon = ctypes.cdll.LoadLibrary(ctypes.util.find_library('Carbon'))
class KeyMap(object):
non_layout_keys = dict((vk, normalize_name(name)) for vk, name in {
# Layout specific keys from https://stackoverflow.com/a/16125341/252218
# Unfortunately no source for layout-independent keys was found.
0x24: 'return',
0x30: 'tab',
0x31: 'space',
0x33: 'delete',
0x35: 'escape',
0x37: 'command',
0x38: 'shift',
0x39: 'capslock',
0x3a: 'option',
0x3b: 'control',
0x3c: 'right shift',
0x3d: 'right option',
0x3e: 'right control',
0x3f: 'function',
0x40: 'f17',
0x48: 'volume up',
0x49: 'volume down',
0x4a: 'mute',
0x4f: 'f18',
0x50: 'f19',
0x5a: 'f20',
0x60: 'f5',
0x61: 'f6',
0x62: 'f7',
0x63: 'f3',
0x64: 'f8',
0x65: 'f9',
0x67: 'f11',
0x69: 'f13',
0x6a: 'f16',
0x6b: 'f14',
0x6d: 'f10',
0x6f: 'f12',
0x71: 'f15',
0x72: 'help',
0x73: 'home',
0x74: 'page up',
0x75: 'forward delete',
0x76: 'f4',
0x77: 'end',
0x78: 'f2',
0x79: 'page down',
0x7a: 'f1',
0x7b: 'left',
0x7c: 'right',
0x7d: 'down',
0x7e: 'up',
}.items())
layout_specific_keys = {}
def __init__(self):
# Virtual key codes are usually the same for any given key, unless you have a different
# keyboard layout. The only way I've found to determine the layout relies on (supposedly
# deprecated) Carbon APIs. If there's a more modern way to do this, please update this
# section.
# Set up data types and exported values:
CFTypeRef = ctypes.c_void_p
CFDataRef = ctypes.c_void_p
CFIndex = ctypes.c_uint64
OptionBits = ctypes.c_uint32
UniCharCount = ctypes.c_uint8
UniChar = ctypes.c_uint16
UniChar4 = UniChar * 4
class CFRange(ctypes.Structure):
_fields_ = [('loc', CFIndex),
('len', CFIndex)]
kTISPropertyUnicodeKeyLayoutData = ctypes.c_void_p.in_dll(Carbon, 'kTISPropertyUnicodeKeyLayoutData')
shiftKey = 0x0200
alphaKey = 0x0400
optionKey = 0x0800
controlKey = 0x1000
kUCKeyActionDisplay = 3
kUCKeyTranslateNoDeadKeysBit = 0
# Set up function calls:
Carbon.CFDataGetBytes.argtypes = [CFDataRef] #, CFRange, UInt8
Carbon.CFDataGetBytes.restype = None
Carbon.CFDataGetLength.argtypes = [CFDataRef]
Carbon.CFDataGetLength.restype = CFIndex
Carbon.CFRelease.argtypes = [CFTypeRef]
Carbon.CFRelease.restype = None
Carbon.LMGetKbdType.argtypes = []
Carbon.LMGetKbdType.restype = ctypes.c_uint32
Carbon.TISCopyCurrentKeyboardInputSource.argtypes = []
Carbon.TISCopyCurrentKeyboardInputSource.restype = ctypes.c_void_p
Carbon.TISCopyCurrentASCIICapableKeyboardLayoutInputSource.argtypes = []
Carbon.TISCopyCurrentASCIICapableKeyboardLayoutInputSource.restype = ctypes.c_void_p
Carbon.TISGetInputSourceProperty.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
Carbon.TISGetInputSourceProperty.restype = ctypes.c_void_p
Carbon.UCKeyTranslate.argtypes = [ctypes.c_void_p,
ctypes.c_uint16,
ctypes.c_uint16,
ctypes.c_uint32,
ctypes.c_uint32,
OptionBits, # keyTranslateOptions
ctypes.POINTER(ctypes.c_uint32), # deadKeyState
UniCharCount, # maxStringLength
ctypes.POINTER(UniCharCount), # actualStringLength
UniChar4]
Carbon.UCKeyTranslate.restype = ctypes.c_uint32
# Get keyboard layout
klis = Carbon.TISCopyCurrentKeyboardInputSource()
k_layout = Carbon.TISGetInputSourceProperty(klis, kTISPropertyUnicodeKeyLayoutData)
if k_layout is None:
klis = Carbon.TISCopyCurrentASCIICapableKeyboardLayoutInputSource()
k_layout = Carbon.TISGetInputSourceProperty(klis, kTISPropertyUnicodeKeyLayoutData)
k_layout_size = Carbon.CFDataGetLength(k_layout)
k_layout_buffer = ctypes.create_string_buffer(k_layout_size) # TODO - Verify this works instead of initializing with empty string
Carbon.CFDataGetBytes(k_layout, CFRange(0, k_layout_size), ctypes.byref(k_layout_buffer))
# Generate character representations of key codes
for key_code in range(0, 128):
# TODO - Possibly add alt modifier to key map
non_shifted_char = UniChar4()
shifted_char = UniChar4()
keys_down = ctypes.c_uint32()
char_count = UniCharCount()
retval = Carbon.UCKeyTranslate(k_layout_buffer,
key_code,
kUCKeyActionDisplay,
0, # No modifier
Carbon.LMGetKbdType(),
kUCKeyTranslateNoDeadKeysBit,
ctypes.byref(keys_down),
4,
ctypes.byref(char_count),
non_shifted_char)
non_shifted_key = u''.join(unichr(non_shifted_char[i]) for i in range(char_count.value))
retval = Carbon.UCKeyTranslate(k_layout_buffer,
key_code,
kUCKeyActionDisplay,
shiftKey >> 8, # Shift
Carbon.LMGetKbdType(),
kUCKeyTranslateNoDeadKeysBit,
ctypes.byref(keys_down),
4,
ctypes.byref(char_count),
shifted_char)
shifted_key = u''.join(unichr(shifted_char[i]) for i in range(char_count.value))
self.layout_specific_keys[key_code] = (non_shifted_key, shifted_key)
# Cleanup
Carbon.CFRelease(klis)
def character_to_vk(self, character):
""" Returns a tuple of (scan_code, modifiers) where ``scan_code`` is a numeric scan code
and ``modifiers`` is an array of string modifier names (like 'shift') """
for vk in self.non_layout_keys:
if self.non_layout_keys[vk] == character.lower():
return (vk, [])
for vk in self.layout_specific_keys:
if self.layout_specific_keys[vk][0] == character:
return (vk, [])
elif self.layout_specific_keys[vk][1] == character:
return (vk, ['shift'])
raise ValueError("Unrecognized character: {}".format(character))
def vk_to_character(self, vk, modifiers=[]):
""" Returns a character corresponding to the specified scan code (with given
modifiers applied) """
if vk in self.non_layout_keys:
# Not a character
return self.non_layout_keys[vk]
elif vk in self.layout_specific_keys:
if 'shift' in modifiers:
return self.layout_specific_keys[vk][1]
return self.layout_specific_keys[vk][0]
else:
# Invalid vk
raise ValueError("Invalid scan code: {}".format(vk))
class KeyController(object):
def __init__(self):
self.key_map = KeyMap()
self.current_modifiers = {
"shift": False,
"caps": False,
"alt": False,
"ctrl": False,
"cmd": False,
}
self.media_keys = {
'KEYTYPE_SOUND_UP': 0,
'KEYTYPE_SOUND_DOWN': 1,
'KEYTYPE_BRIGHTNESS_UP': 2,
'KEYTYPE_BRIGHTNESS_DOWN': 3,
'KEYTYPE_CAPS_LOCK': 4,
'KEYTYPE_HELP': 5,
'POWER_KEY': 6,
'KEYTYPE_MUTE': 7,
'UP_ARROW_KEY': 8,
'DOWN_ARROW_KEY': 9,
'KEYTYPE_NUM_LOCK': 10,
'KEYTYPE_CONTRAST_UP': 11,
'KEYTYPE_CONTRAST_DOWN': 12,
'KEYTYPE_LAUNCH_PANEL': 13,
'KEYTYPE_EJECT': 14,
'KEYTYPE_VIDMIRROR': 15,
'KEYTYPE_PLAY': 16,
'KEYTYPE_NEXT': 17,
'KEYTYPE_PREVIOUS': 18,
'KEYTYPE_FAST': 19,
'KEYTYPE_REWIND': 20,
'KEYTYPE_ILLUMINATION_UP': 21,
'KEYTYPE_ILLUMINATION_DOWN': 22,
'KEYTYPE_ILLUMINATION_TOGGLE': 23
}
def press(self, key_code):
""" Sends a 'down' event for the specified scan code """
if key_code >= 128:
# Media key
ev = NSEvent.otherEventWithType_location_modifierFlags_timestamp_windowNumber_context_subtype_data1_data2_(
14, # type
(0, 0), # location
0xa00, # flags
0, # timestamp
0, # window
0, # ctx
8, # subtype
((key_code-128) << 16) | (0xa << 8), # data1
-1 # data2
)
Quartz.CGEventPost(0, ev.CGEvent())
else:
# Regular key
# Apply modifiers if necessary
event_flags = 0
if self.current_modifiers["shift"]:
event_flags += Quartz.kCGEventFlagMaskShift
if self.current_modifiers["caps"]:
event_flags += Quartz.kCGEventFlagMaskAlphaShift
if self.current_modifiers["alt"]:
event_flags += Quartz.kCGEventFlagMaskAlternate
if self.current_modifiers["ctrl"]:
event_flags += Quartz.kCGEventFlagMaskControl
if self.current_modifiers["cmd"]:
event_flags += Quartz.kCGEventFlagMaskCommand
# Update modifiers if necessary
if key_code == 0x37: # cmd
self.current_modifiers["cmd"] = True
elif key_code == 0x38 or key_code == 0x3C: # shift or right shift
self.current_modifiers["shift"] = True
elif key_code == 0x39: # caps lock
self.current_modifiers["caps"] = True
elif key_code == 0x3A: # alt
self.current_modifiers["alt"] = True
elif key_code == 0x3B: # ctrl
self.current_modifiers["ctrl"] = True
event = Quartz.CGEventCreateKeyboardEvent(None, key_code, True)
Quartz.CGEventSetFlags(event, event_flags)
Quartz.CGEventPost(Quartz.kCGHIDEventTap, event)
time.sleep(0.01)
def release(self, key_code):
""" Sends an 'up' event for the specified scan code """
if key_code >= 128:
# Media key
ev = NSEvent.otherEventWithType_location_modifierFlags_timestamp_windowNumber_context_subtype_data1_data2_(
14, # type
(0, 0), # location
0xb00, # flags
0, # timestamp
0, # window
0, # ctx
8, # subtype
((key_code-128) << 16) | (0xb << 8), # data1
-1 # data2
)
Quartz.CGEventPost(0, ev.CGEvent())
else:
# Regular key
# Update modifiers if necessary
if key_code == 0x37: # cmd
self.current_modifiers["cmd"] = False
elif key_code == 0x38 or key_code == 0x3C: # shift or right shift
self.current_modifiers["shift"] = False
elif key_code == 0x39: # caps lock
self.current_modifiers["caps"] = False
elif key_code == 0x3A: # alt
self.current_modifiers["alt"] = False
elif key_code == 0x3B: # ctrl
self.current_modifiers["ctrl"] = False
# Apply modifiers if necessary
event_flags = 0
if self.current_modifiers["shift"]:
event_flags += Quartz.kCGEventFlagMaskShift
if self.current_modifiers["caps"]:
event_flags += Quartz.kCGEventFlagMaskAlphaShift
if self.current_modifiers["alt"]:
event_flags += Quartz.kCGEventFlagMaskAlternate
if self.current_modifiers["ctrl"]:
event_flags += Quartz.kCGEventFlagMaskControl
if self.current_modifiers["cmd"]:
event_flags += Quartz.kCGEventFlagMaskCommand
event = Quartz.CGEventCreateKeyboardEvent(None, key_code, False)
Quartz.CGEventSetFlags(event, event_flags)
Quartz.CGEventPost(Quartz.kCGHIDEventTap, event)
time.sleep(0.01)
def map_char(self, character):
if character in self.media_keys:
return (128+self.media_keys[character],[])
else:
return self.key_map.character_to_vk(character)
def map_scan_code(self, scan_code):
if scan_code >= 128:
character = [k for k, v in enumerate(self.media_keys) if v == scan_code-128]
if len(character):
return character[0]
return None
else:
return self.key_map.vk_to_character(scan_code)
class KeyEventListener(object):
def __init__(self, callback, blocking=False):
self.blocking = blocking
self.callback = callback
self.listening = True
self.tap = None
def run(self):
""" Creates a listener and loops while waiting for an event. Intended to run as
a background thread. """
self.tap = Quartz.CGEventTapCreate(
Quartz.kCGSessionEventTap,
Quartz.kCGHeadInsertEventTap,
Quartz.kCGEventTapOptionDefault,
Quartz.CGEventMaskBit(Quartz.kCGEventKeyDown) |
Quartz.CGEventMaskBit(Quartz.kCGEventKeyUp) |
Quartz.CGEventMaskBit(Quartz.kCGEventFlagsChanged),
self.handler,
None)
loopsource = Quartz.CFMachPortCreateRunLoopSource(None, self.tap, 0)
loop = Quartz.CFRunLoopGetCurrent()
Quartz.CFRunLoopAddSource(loop, loopsource, Quartz.kCFRunLoopDefaultMode)
Quartz.CGEventTapEnable(self.tap, True)
while self.listening:
Quartz.CFRunLoopRunInMode(Quartz.kCFRunLoopDefaultMode, 5, False)
def handler(self, proxy, e_type, event, refcon):
scan_code = Quartz.CGEventGetIntegerValueField(event, Quartz.kCGKeyboardEventKeycode)
key_name = name_from_scancode(scan_code)
flags = Quartz.CGEventGetFlags(event)
event_type = ""
is_keypad = (flags & Quartz.kCGEventFlagMaskNumericPad)
if e_type == Quartz.kCGEventKeyDown:
event_type = "down"
elif e_type == Quartz.kCGEventKeyUp:
event_type = "up"
elif e_type == Quartz.kCGEventFlagsChanged:
if key_name.endswith("shift") and (flags & Quartz.kCGEventFlagMaskShift):
event_type = "down"
elif key_name == "caps lock" and (flags & Quartz.kCGEventFlagMaskAlphaShift):
event_type = "down"
elif (key_name.endswith("option") or key_name.endswith("alt")) and (flags & Quartz.kCGEventFlagMaskAlternate):
event_type = "down"
elif key_name == "ctrl" and (flags & Quartz.kCGEventFlagMaskControl):
event_type = "down"
elif key_name == "command" and (flags & Quartz.kCGEventFlagMaskCommand):
event_type = "down"
else:
event_type = "up"
if self.blocking:
return None
self.callback(KeyboardEvent(event_type, scan_code, name=key_name, is_keypad=is_keypad))
return event
key_controller = KeyController()
""" Exported functions below """
def init():
key_controller = KeyController()
def press(scan_code):
""" Sends a 'down' event for the specified scan code """
key_controller.press(scan_code)
def release(scan_code):
""" Sends an 'up' event for the specified scan code """
key_controller.release(scan_code)
def map_name(name):
""" Returns a tuple of (scan_code, modifiers) where ``scan_code`` is a numeric scan code
and ``modifiers`` is an array of string modifier names (like 'shift') """
yield key_controller.map_char(name)
def name_from_scancode(scan_code):
""" Returns the name or character associated with the specified key code """
return key_controller.map_scan_code(scan_code)
def listen(callback):
if not os.geteuid() == 0:
raise OSError("Error 13 - Must be run as administrator")
KeyEventListener(callback).run()
def type_unicode(character):
OUTPUT_SOURCE = Quartz.CGEventSourceCreate(Quartz.kCGEventSourceStateHIDSystemState)
# Key down
event = Quartz.CGEventCreateKeyboardEvent(OUTPUT_SOURCE, 0, True)
Quartz.CGEventKeyboardSetUnicodeString(event, len(character.encode('utf-16-le')) // 2, character)
Quartz.CGEventPost(Quartz.kCGSessionEventTap, event)
# Key up
event = Quartz.CGEventCreateKeyboardEvent(OUTPUT_SOURCE, 0, False)
Quartz.CGEventKeyboardSetUnicodeString(event, len(character.encode('utf-16-le')) // 2, character)
Quartz.CGEventPost(Quartz.kCGSessionEventTap, event)

@ -0,0 +1,173 @@
import os
import datetime
import threading
import Quartz
from ._mouse_event import ButtonEvent, WheelEvent, MoveEvent, LEFT, RIGHT, MIDDLE, X, X2, UP, DOWN
_button_mapping = {
LEFT: (Quartz.kCGMouseButtonLeft, Quartz.kCGEventLeftMouseDown, Quartz.kCGEventLeftMouseUp, Quartz.kCGEventLeftMouseDragged),
RIGHT: (Quartz.kCGMouseButtonRight, Quartz.kCGEventRightMouseDown, Quartz.kCGEventRightMouseUp, Quartz.kCGEventRightMouseDragged),
MIDDLE: (Quartz.kCGMouseButtonCenter, Quartz.kCGEventOtherMouseDown, Quartz.kCGEventOtherMouseUp, Quartz.kCGEventOtherMouseDragged)
}
_button_state = {
LEFT: False,
RIGHT: False,
MIDDLE: False
}
_last_click = {
"time": None,
"button": None,
"position": None,
"click_count": 0
}
class MouseEventListener(object):
def __init__(self, callback, blocking=False):
self.blocking = blocking
self.callback = callback
self.listening = True
def run(self):
""" Creates a listener and loops while waiting for an event. Intended to run as
a background thread. """
self.tap = Quartz.CGEventTapCreate(
Quartz.kCGSessionEventTap,
Quartz.kCGHeadInsertEventTap,
Quartz.kCGEventTapOptionDefault,
Quartz.CGEventMaskBit(Quartz.kCGEventLeftMouseDown) |
Quartz.CGEventMaskBit(Quartz.kCGEventLeftMouseUp) |
Quartz.CGEventMaskBit(Quartz.kCGEventRightMouseDown) |
Quartz.CGEventMaskBit(Quartz.kCGEventRightMouseUp) |
Quartz.CGEventMaskBit(Quartz.kCGEventOtherMouseDown) |
Quartz.CGEventMaskBit(Quartz.kCGEventOtherMouseUp) |
Quartz.CGEventMaskBit(Quartz.kCGEventMouseMoved) |
Quartz.CGEventMaskBit(Quartz.kCGEventScrollWheel),
self.handler,
None)
loopsource = Quartz.CFMachPortCreateRunLoopSource(None, self.tap, 0)
loop = Quartz.CFRunLoopGetCurrent()
Quartz.CFRunLoopAddSource(loop, loopsource, Quartz.kCFRunLoopDefaultMode)
Quartz.CGEventTapEnable(self.tap, True)
while self.listening:
Quartz.CFRunLoopRunInMode(Quartz.kCFRunLoopDefaultMode, 5, False)
def handler(self, proxy, e_type, event, refcon):
# TODO Separate event types by button/wheel/move
scan_code = Quartz.CGEventGetIntegerValueField(event, Quartz.kCGKeyboardEventKeycode)
key_name = name_from_scancode(scan_code)
flags = Quartz.CGEventGetFlags(event)
event_type = ""
is_keypad = (flags & Quartz.kCGEventFlagMaskNumericPad)
if e_type == Quartz.kCGEventKeyDown:
event_type = "down"
elif e_type == Quartz.kCGEventKeyUp:
event_type = "up"
if self.blocking:
return None
self.callback(KeyboardEvent(event_type, scan_code, name=key_name, is_keypad=is_keypad))
return event
# Exports
def init():
""" Initializes mouse state """
pass
def listen(queue):
""" Appends events to the queue (ButtonEvent, WheelEvent, and MoveEvent). """
if not os.geteuid() == 0:
raise OSError("Error 13 - Must be run as administrator")
listener = MouseEventListener(lambda e: queue.put(e) or is_allowed(e.name, e.event_type == KEY_UP))
t = threading.Thread(target=listener.run, args=())
t.daemon = True
t.start()
def press(button=LEFT):
""" Sends a down event for the specified button, using the provided constants """
location = get_position()
button_code, button_down, _, _ = _button_mapping[button]
e = Quartz.CGEventCreateMouseEvent(
None,
button_down,
location,
button_code)
# Check if this is a double-click (same location within the last 300ms)
if _last_click["time"] is not None and datetime.datetime.now() - _last_click["time"] < datetime.timedelta(seconds=0.3) and _last_click["button"] == button and _last_click["position"] == location:
# Repeated Click
_last_click["click_count"] = min(3, _last_click["click_count"]+1)
else:
# Not a double-click - Reset last click
_last_click["click_count"] = 1
Quartz.CGEventSetIntegerValueField(
e,
Quartz.kCGMouseEventClickState,
_last_click["click_count"])
Quartz.CGEventPost(Quartz.kCGHIDEventTap, e)
_button_state[button] = True
_last_click["time"] = datetime.datetime.now()
_last_click["button"] = button
_last_click["position"] = location
def release(button=LEFT):
""" Sends an up event for the specified button, using the provided constants """
location = get_position()
button_code, _, button_up, _ = _button_mapping[button]
e = Quartz.CGEventCreateMouseEvent(
None,
button_up,
location,
button_code)
if _last_click["time"] is not None and _last_click["time"] > datetime.datetime.now() - datetime.timedelta(microseconds=300000) and _last_click["button"] == button and _last_click["position"] == location:
# Repeated Click
Quartz.CGEventSetIntegerValueField(
e,
Quartz.kCGMouseEventClickState,
_last_click["click_count"])
Quartz.CGEventPost(Quartz.kCGHIDEventTap, e)
_button_state[button] = False
def wheel(delta=1):
""" Sends a wheel event for the provided number of clicks. May be negative to reverse
direction. """
location = get_position()
e = Quartz.CGEventCreateMouseEvent(
None,
Quartz.kCGEventScrollWheel,
location,
Quartz.kCGMouseButtonLeft)
e2 = Quartz.CGEventCreateScrollWheelEvent(
None,
Quartz.kCGScrollEventUnitLine,
1,
delta)
Quartz.CGEventPost(Quartz.kCGHIDEventTap, e)
Quartz.CGEventPost(Quartz.kCGHIDEventTap, e2)
def move_to(x, y):
""" Sets the mouse's location to the specified coordinates. """
for b in _button_state:
if _button_state[b]:
e = Quartz.CGEventCreateMouseEvent(
None,
_button_mapping[b][3], # Drag Event
(x, y),
_button_mapping[b][0])
break
else:
e = Quartz.CGEventCreateMouseEvent(
None,
Quartz.kCGEventMouseMoved,
(x, y),
Quartz.kCGMouseButtonLeft)
Quartz.CGEventPost(Quartz.kCGHIDEventTap, e)
def get_position():
""" Returns the mouse's location as a tuple of (x, y). """
e = Quartz.CGEventCreate(None)
point = Quartz.CGEventGetLocation(e)
return (point.x, point.y)

@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
from threading import Thread, Lock
import traceback
import functools
try:
from queue import Queue
except ImportError:
from Queue import Queue
class GenericListener(object):
lock = Lock()
def __init__(self):
self.handlers = []
self.listening = False
self.queue = Queue()
def invoke_handlers(self, event):
for handler in self.handlers:
try:
if handler(event):
# Stop processing this hotkey.
return 1
except Exception as e:
traceback.print_exc()
def start_if_necessary(self):
"""
Starts the listening thread if it wans't already.
"""
self.lock.acquire()
try:
if not self.listening:
self.init()
self.listening = True
self.listening_thread = Thread(target=self.listen)
self.listening_thread.daemon = True
self.listening_thread.start()
self.processing_thread = Thread(target=self.process)
self.processing_thread.daemon = True
self.processing_thread.start()
finally:
self.lock.release()
def pre_process_event(self, event):
raise NotImplementedError('This method should be implemented in the child class.')
def process(self):
"""
Loops over the underlying queue of events and processes them in order.
"""
assert self.queue is not None
while True:
event = self.queue.get()
if self.pre_process_event(event):
self.invoke_handlers(event)
self.queue.task_done()
def add_handler(self, handler):
"""
Adds a function to receive each event captured, starting the capturing
process if necessary.
"""
self.start_if_necessary()
self.handlers.append(handler)
def remove_handler(self, handler):
""" Removes a previously added event handler. """
while handler in self.handlers:
self.handlers.remove(handler)

@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
from time import time as now
import json
from ._canonical_names import canonical_names, normalize_name
try:
basestring
except NameError:
basestring = str
KEY_DOWN = 'down'
KEY_UP = 'up'
class KeyboardEvent(object):
event_type = None
scan_code = None
name = None
time = None
device = None
modifiers = None
is_keypad = None
def __init__(self, event_type, scan_code, name=None, time=None, device=None, modifiers=None, is_keypad=None):
self.event_type = event_type
self.scan_code = scan_code
self.time = now() if time is None else time
self.device = device
self.is_keypad = is_keypad
self.modifiers = modifiers
if name:
self.name = normalize_name(name)
def to_json(self, ensure_ascii=False):
attrs = dict(
(attr, getattr(self, attr)) for attr in ['event_type', 'scan_code', 'name', 'time', 'device', 'is_keypad']
if not attr.startswith('_') and getattr(self, attr) is not None
)
return json.dumps(attrs, ensure_ascii=ensure_ascii)
def __repr__(self):
return 'KeyboardEvent({} {})'.format(self.name or 'Unknown {}'.format(self.scan_code), self.event_type)
def __eq__(self, other):
return (
isinstance(other, KeyboardEvent)
and self.event_type == other.event_type
and (
not self.scan_code or not other.scan_code or self.scan_code == other.scan_code
) and (
not self.name or not other.name or self.name == other.name
)
)

@ -0,0 +1,827 @@
# -*- coding: utf-8 -*-
"""
Side effects are avoided using two techniques:
- Low level OS requests (keyboard._os_keyboard) are mocked out by rewriting
the functions at that namespace. This includes a list of dummy keys.
- Events are pumped manually by the main test class, and accepted events
are tested against expected values.
Fake user events are appended to `input_events`, passed through
keyboard,_listener.direct_callback, then, if accepted, appended to
`output_events`. Fake OS events (keyboard.press) are processed
and added to `output_events` immediately, mimicking real functionality.
"""
from __future__ import print_function
import unittest
import time
import keyboard
from ._keyboard_event import KeyboardEvent, KEY_DOWN, KEY_UP
dummy_keys = {
'space': [(0, [])],
'a': [(1, [])],
'b': [(2, [])],
'c': [(3, [])],
'A': [(1, ['shift']), (-1, [])],
'B': [(2, ['shift']), (-2, [])],
'C': [(3, ['shift']), (-3, [])],
'alt': [(4, [])],
'left alt': [(4, [])],
'left shift': [(5, [])],
'right shift': [(6, [])],
'left ctrl': [(7, [])],
'backspace': [(8, [])],
'caps lock': [(9, [])],
'+': [(10, [])],
',': [(11, [])],
'_': [(12, [])],
'none': [],
'duplicated': [(20, []), (20, [])],
}
def make_event(event_type, name, scan_code=None, time=0):
return KeyboardEvent(event_type=event_type, scan_code=scan_code or dummy_keys[name][0][0], name=name, time=time)
# Used when manually pumping events.
input_events = []
output_events = []
def send_instant_event(event):
if keyboard._listener.direct_callback(event):
output_events.append(event)
# Mock out side effects.
keyboard._os_keyboard.init = lambda: None
keyboard._os_keyboard.listen = lambda callback: None
keyboard._os_keyboard.map_name = dummy_keys.__getitem__
keyboard._os_keyboard.press = lambda scan_code: send_instant_event(make_event(KEY_DOWN, None, scan_code))
keyboard._os_keyboard.release = lambda scan_code: send_instant_event(make_event(KEY_UP, None, scan_code))
keyboard._os_keyboard.type_unicode = lambda char: output_events.append(KeyboardEvent(event_type=KEY_DOWN, scan_code=999, name=char))
# Shortcuts for defining test inputs and expected outputs.
# Usage: d_shift + d_a + u_a + u_shift
d_a = [make_event(KEY_DOWN, 'a')]
u_a = [make_event(KEY_UP, 'a')]
du_a = d_a+u_a
d_b = [make_event(KEY_DOWN, 'b')]
u_b = [make_event(KEY_UP, 'b')]
du_b = d_b+u_b
d_c = [make_event(KEY_DOWN, 'c')]
u_c = [make_event(KEY_UP, 'c')]
du_c = d_c+u_c
d_ctrl = [make_event(KEY_DOWN, 'left ctrl')]
u_ctrl = [make_event(KEY_UP, 'left ctrl')]
du_ctrl = d_ctrl+u_ctrl
d_shift = [make_event(KEY_DOWN, 'left shift')]
u_shift = [make_event(KEY_UP, 'left shift')]
du_shift = d_shift+u_shift
d_alt = [make_event(KEY_DOWN, 'alt')]
u_alt = [make_event(KEY_UP, 'alt')]
du_alt = d_alt+u_alt
du_backspace = [make_event(KEY_DOWN, 'backspace'), make_event(KEY_UP, 'backspace')]
du_capslock = [make_event(KEY_DOWN, 'caps lock'), make_event(KEY_UP, 'caps lock')]
d_space = [make_event(KEY_DOWN, 'space')]
u_space = [make_event(KEY_UP, 'space')]
du_space = [make_event(KEY_DOWN, 'space'), make_event(KEY_UP, 'space')]
trigger = lambda e=None: keyboard.press(999)
triggered_event = [KeyboardEvent(KEY_DOWN, scan_code=999)]
class TestKeyboard(unittest.TestCase):
def tearDown(self):
keyboard.unhook_all()
#self.assertEquals(keyboard._hooks, {})
#self.assertEquals(keyboard._hotkeys, {})
def setUp(self):
#keyboard._hooks.clear()
#keyboard._hotkeys.clear()
del input_events[:]
del output_events[:]
keyboard._recording = None
keyboard._pressed_events.clear()
keyboard._physically_pressed_keys.clear()
keyboard._logically_pressed_keys.clear()
keyboard._hotkeys.clear()
keyboard._listener.init()
keyboard._word_listeners = {}
def do(self, manual_events, expected=None):
input_events.extend(manual_events)
while input_events:
event = input_events.pop(0)
if keyboard._listener.direct_callback(event):
output_events.append(event)
if expected is not None:
to_names = lambda es: '+'.join(('d' if e.event_type == KEY_DOWN else 'u') + '_' + str(e.scan_code) for e in es)
self.assertEqual(to_names(output_events), to_names(expected))
del output_events[:]
keyboard._listener.queue.join()
def test_event_json(self):
event = make_event(KEY_DOWN, u'á \'"', 999)
import json
self.assertEqual(event, KeyboardEvent(**json.loads(event.to_json())))
def test_is_modifier_name(self):
for name in keyboard.all_modifiers:
self.assertTrue(keyboard.is_modifier(name))
def test_is_modifier_scan_code(self):
for i in range(10):
self.assertEqual(keyboard.is_modifier(i), i in [4, 5, 6, 7])
def test_key_to_scan_codes_brute(self):
for name, entries in dummy_keys.items():
if name in ['none', 'duplicated']: continue
expected = tuple(scan_code for scan_code, modifiers in entries)
self.assertEqual(keyboard.key_to_scan_codes(name), expected)
def test_key_to_scan_code_from_scan_code(self):
for i in range(10):
self.assertEqual(keyboard.key_to_scan_codes(i), (i,))
def test_key_to_scan_code_from_letter(self):
self.assertEqual(keyboard.key_to_scan_codes('a'), (1,))
self.assertEqual(keyboard.key_to_scan_codes('A'), (1,-1))
def test_key_to_scan_code_from_normalized(self):
self.assertEqual(keyboard.key_to_scan_codes('shift'), (5,6))
self.assertEqual(keyboard.key_to_scan_codes('SHIFT'), (5,6))
self.assertEqual(keyboard.key_to_scan_codes('ctrl'), keyboard.key_to_scan_codes('CONTROL'))
def test_key_to_scan_code_from_sided_modifier(self):
self.assertEqual(keyboard.key_to_scan_codes('left shift'), (5,))
self.assertEqual(keyboard.key_to_scan_codes('right shift'), (6,))
def test_key_to_scan_code_underscores(self):
self.assertEqual(keyboard.key_to_scan_codes('_'), (12,))
self.assertEqual(keyboard.key_to_scan_codes('right_shift'), (6,))
def test_key_to_scan_code_error_none(self):
with self.assertRaises(ValueError):
keyboard.key_to_scan_codes(None)
def test_key_to_scan_code_error_empty(self):
with self.assertRaises(ValueError):
keyboard.key_to_scan_codes('')
def test_key_to_scan_code_error_other(self):
with self.assertRaises(ValueError):
keyboard.key_to_scan_codes({})
def test_key_to_scan_code_list(self):
self.assertEqual(keyboard.key_to_scan_codes([10, 5, 'a']), (10, 5, 1))
def test_key_to_scan_code_empty(self):
with self.assertRaises(ValueError):
keyboard.key_to_scan_codes('none')
def test_key_to_scan_code_duplicated(self):
self.assertEqual(keyboard.key_to_scan_codes('duplicated'), (20,))
def test_parse_hotkey_simple(self):
self.assertEqual(keyboard.parse_hotkey('a'), (((1,),),))
self.assertEqual(keyboard.parse_hotkey('A'), (((1,-1),),))
def test_parse_hotkey_separators(self):
self.assertEqual(keyboard.parse_hotkey('+'), keyboard.parse_hotkey('plus'))
self.assertEqual(keyboard.parse_hotkey(','), keyboard.parse_hotkey('comma'))
def test_parse_hotkey_keys(self):
self.assertEqual(keyboard.parse_hotkey('left shift + a'), (((5,), (1,),),))
self.assertEqual(keyboard.parse_hotkey('left shift+a'), (((5,), (1,),),))
def test_parse_hotkey_simple_steps(self):
self.assertEqual(keyboard.parse_hotkey('a,b'), (((1,),),((2,),)))
self.assertEqual(keyboard.parse_hotkey('a, b'), (((1,),),((2,),)))
def test_parse_hotkey_steps(self):
self.assertEqual(keyboard.parse_hotkey('a+b, b+c'), (((1,),(2,)),((2,),(3,))))
def test_parse_hotkey_example(self):
alt_codes = keyboard.key_to_scan_codes('alt')
shift_codes = keyboard.key_to_scan_codes('shift')
a_codes = keyboard.key_to_scan_codes('a')
b_codes = keyboard.key_to_scan_codes('b')
c_codes = keyboard.key_to_scan_codes('c')
self.assertEqual(keyboard.parse_hotkey("alt+shift+a, alt+b, c"), ((alt_codes, shift_codes, a_codes), (alt_codes, b_codes), (c_codes,)))
def test_parse_hotkey_list_scan_codes(self):
self.assertEqual(keyboard.parse_hotkey([1, 2, 3]), (((1,), (2,), (3,)),))
def test_parse_hotkey_deep_list_scan_codes(self):
result = keyboard.parse_hotkey('a')
self.assertEqual(keyboard.parse_hotkey(result), (((1,),),))
def test_parse_hotkey_list_names(self):
self.assertEqual(keyboard.parse_hotkey(['a', 'b', 'c']), (((1,), (2,), (3,)),))
def test_is_pressed_none(self):
self.assertFalse(keyboard.is_pressed('a'))
def test_is_pressed_true(self):
self.do(d_a)
self.assertTrue(keyboard.is_pressed('a'))
def test_is_pressed_true_scan_code_true(self):
self.do(d_a)
self.assertTrue(keyboard.is_pressed(1))
def test_is_pressed_true_scan_code_false(self):
self.do(d_a)
self.assertFalse(keyboard.is_pressed(2))
def test_is_pressed_true_scan_code_invalid(self):
self.do(d_a)
self.assertFalse(keyboard.is_pressed(-1))
def test_is_pressed_false(self):
self.do(d_a+u_a+d_b)
self.assertFalse(keyboard.is_pressed('a'))
self.assertTrue(keyboard.is_pressed('b'))
def test_is_pressed_hotkey_true(self):
self.do(d_shift+d_a)
self.assertTrue(keyboard.is_pressed('shift+a'))
def test_is_pressed_hotkey_false(self):
self.do(d_shift+d_a+u_a)
self.assertFalse(keyboard.is_pressed('shift+a'))
def test_is_pressed_multi_step_fail(self):
self.do(u_a+d_a)
with self.assertRaises(ValueError):
keyboard.is_pressed('a, b')
def test_send_single_press_release(self):
keyboard.send('a', do_press=True, do_release=True)
self.do([], d_a+u_a)
def test_send_single_press(self):
keyboard.send('a', do_press=True, do_release=False)
self.do([], d_a)
def test_send_single_release(self):
keyboard.send('a', do_press=False, do_release=True)
self.do([], u_a)
def test_send_single_none(self):
keyboard.send('a', do_press=False, do_release=False)
self.do([], [])
def test_press(self):
keyboard.press('a')
self.do([], d_a)
def test_release(self):
keyboard.release('a')
self.do([], u_a)
def test_press_and_release(self):
keyboard.press_and_release('a')
self.do([], d_a+u_a)
def test_send_modifier_press_release(self):
keyboard.send('ctrl+a', do_press=True, do_release=True)
self.do([], d_ctrl+d_a+u_a+u_ctrl)
def test_send_modifiers_release(self):
keyboard.send('ctrl+shift+a', do_press=False, do_release=True)
self.do([], u_a+u_shift+u_ctrl)
def test_call_later(self):
triggered = []
def fn(arg1, arg2):
assert arg1 == 1 and arg2 == 2
triggered.append(True)
keyboard.call_later(fn, (1, 2), 0.01)
self.assertFalse(triggered)
time.sleep(0.05)
self.assertTrue(triggered)
def test_hook_nonblocking(self):
self.i = 0
def count(e):
self.assertEqual(e.name, 'a')
self.i += 1
hook = keyboard.hook(count, suppress=False)
self.do(d_a+u_a, d_a+u_a)
self.assertEqual(self.i, 2)
keyboard.unhook(hook)
self.do(d_a+u_a, d_a+u_a)
self.assertEqual(self.i, 2)
keyboard.hook(count, suppress=False)
self.do(d_a+u_a, d_a+u_a)
self.assertEqual(self.i, 4)
keyboard.unhook_all()
self.do(d_a+u_a, d_a+u_a)
self.assertEqual(self.i, 4)
def test_hook_blocking(self):
self.i = 0
def count(e):
self.assertIn(e.name, ['a', 'b'])
self.i += 1
return e.name == 'b'
hook = keyboard.hook(count, suppress=True)
self.do(d_a+d_b, d_b)
self.assertEqual(self.i, 2)
keyboard.unhook(hook)
self.do(d_a+d_b, d_a+d_b)
self.assertEqual(self.i, 2)
keyboard.hook(count, suppress=True)
self.do(d_a+d_b, d_b)
self.assertEqual(self.i, 4)
keyboard.unhook_all()
self.do(d_a+d_b, d_a+d_b)
self.assertEqual(self.i, 4)
def test_on_press_nonblocking(self):
keyboard.on_press(lambda e: self.assertEqual(e.name, 'a') and self.assertEqual(e.event_type, KEY_DOWN))
self.do(d_a+u_a)
def test_on_press_blocking(self):
keyboard.on_press(lambda e: e.scan_code == 1, suppress=True)
self.do([make_event(KEY_DOWN, 'A', -1)] + d_a, d_a)
def test_on_release(self):
keyboard.on_release(lambda e: self.assertEqual(e.name, 'a') and self.assertEqual(e.event_type, KEY_UP))
self.do(d_a+u_a)
def test_hook_key_invalid(self):
with self.assertRaises(ValueError):
keyboard.hook_key('invalid', lambda e: None)
def test_hook_key_nonblocking(self):
self.i = 0
def count(event):
self.i += 1
hook = keyboard.hook_key('A', count)
self.do(d_a)
self.assertEqual(self.i, 1)
self.do(u_a+d_b)
self.assertEqual(self.i, 2)
self.do([make_event(KEY_DOWN, 'A', -1)])
self.assertEqual(self.i, 3)
keyboard.unhook_key(hook)
self.do(d_a)
self.assertEqual(self.i, 3)
def test_hook_key_blocking(self):
self.i = 0
def count(event):
self.i += 1
return event.scan_code == 1
hook = keyboard.hook_key('A', count, suppress=True)
self.do(d_a, d_a)
self.assertEqual(self.i, 1)
self.do(u_a+d_b, u_a+d_b)
self.assertEqual(self.i, 2)
self.do([make_event(KEY_DOWN, 'A', -1)], [])
self.assertEqual(self.i, 3)
keyboard.unhook_key(hook)
self.do([make_event(KEY_DOWN, 'A', -1)], [make_event(KEY_DOWN, 'A', -1)])
self.assertEqual(self.i, 3)
def test_on_press_key_nonblocking(self):
keyboard.on_press_key('A', lambda e: self.assertEqual(e.name, 'a') and self.assertEqual(e.event_type, KEY_DOWN))
self.do(d_a+u_a+d_b+u_b)
def test_on_press_key_blocking(self):
keyboard.on_press_key('A', lambda e: e.scan_code == 1, suppress=True)
self.do([make_event(KEY_DOWN, 'A', -1)] + d_a, d_a)
def test_on_release_key(self):
keyboard.on_release_key('a', lambda e: self.assertEqual(e.name, 'a') and self.assertEqual(e.event_type, KEY_UP))
self.do(d_a+u_a)
def test_block_key(self):
blocked = keyboard.block_key('a')
self.do(d_a+d_b, d_b)
self.do([make_event(KEY_DOWN, 'A', -1)], [make_event(KEY_DOWN, 'A', -1)])
keyboard.unblock_key(blocked)
self.do(d_a+d_b, d_a+d_b)
def test_block_key_ambiguous(self):
keyboard.block_key('A')
self.do(d_a+d_b, d_b)
self.do([make_event(KEY_DOWN, 'A', -1)], [])
def test_remap_key_simple(self):
mapped = keyboard.remap_key('a', 'b')
self.do(d_a+d_c+u_a, d_b+d_c+u_b)
keyboard.unremap_key(mapped)
self.do(d_a+d_c+u_a, d_a+d_c+u_a)
def test_remap_key_ambiguous(self):
keyboard.remap_key('A', 'b')
self.do(d_a+d_b, d_b+d_b)
self.do([make_event(KEY_DOWN, 'A', -1)], d_b)
def test_remap_key_multiple(self):
mapped = keyboard.remap_key('a', 'shift+b')
self.do(d_a+d_c+u_a, d_shift+d_b+d_c+u_b+u_shift)
keyboard.unremap_key(mapped)
self.do(d_a+d_c+u_a, d_a+d_c+u_a)
def test_stash_state(self):
self.do(d_a+d_shift)
self.assertEqual(sorted(keyboard.stash_state()), [1, 5])
self.do([], u_a+u_shift)
def test_restore_state(self):
self.do(d_b)
keyboard.restore_state([1, 5])
self.do([], u_b+d_a+d_shift)
def test_restore_modifieres(self):
self.do(d_b)
keyboard.restore_modifiers([1, 5])
self.do([], u_b+d_shift)
def test_write_simple(self):
keyboard.write('a', exact=False)
self.do([], d_a+u_a)
def test_write_multiple(self):
keyboard.write('ab', exact=False)
self.do([], d_a+u_a+d_b+u_b)
def test_write_modifiers(self):
keyboard.write('Ab', exact=False)
self.do([], d_shift+d_a+u_a+u_shift+d_b+u_b)
# restore_state_after has been removed after the introduction of `restore_modifiers`.
#def test_write_stash_not_restore(self):
# self.do(d_shift)
# keyboard.write('a', restore_state_after=False, exact=False)
# self.do([], u_shift+d_a+u_a)
def test_write_stash_restore(self):
self.do(d_shift)
keyboard.write('a', exact=False)
self.do([], u_shift+d_a+u_a+d_shift)
def test_write_multiple(self):
last_time = time.time()
keyboard.write('ab', delay=0.01, exact=False)
self.do([], d_a+u_a+d_b+u_b)
self.assertGreater(time.time() - last_time, 0.015)
def test_write_unicode_explicit(self):
keyboard.write('ab', exact=True)
self.do([], [KeyboardEvent(event_type=KEY_DOWN, scan_code=999, name='a'), KeyboardEvent(event_type=KEY_DOWN, scan_code=999, name='b')])
def test_write_unicode_fallback(self):
keyboard.write(u'áb', exact=False)
self.do([], [KeyboardEvent(event_type=KEY_DOWN, scan_code=999, name=u'á')]+d_b+u_b)
def test_start_stop_recording(self):
keyboard.start_recording()
self.do(d_a+u_a)
self.assertEqual(keyboard.stop_recording(), d_a+u_a)
def test_stop_recording_error(self):
with self.assertRaises(ValueError):
keyboard.stop_recording()
def test_record(self):
queue = keyboard._queue.Queue()
def process():
queue.put(keyboard.record('space', suppress=True))
from threading import Thread
t = Thread(target=process)
t.daemon = True
t.start()
# 0.01s sleep failed once already. Better solutions?
time.sleep(0.01)
self.do(du_a+du_b+du_space, du_a+du_b)
self.assertEqual(queue.get(timeout=0.5), du_a+du_b+du_space)
def test_play_nodelay(self):
keyboard.play(d_a+u_a, 0)
self.do([], d_a+u_a)
def test_play_stash(self):
self.do(d_ctrl)
keyboard.play(d_a+u_a, 0)
self.do([], u_ctrl+d_a+u_a+d_ctrl)
def test_play_delay(self):
last_time = time.time()
events = [make_event(KEY_DOWN, 'a', 1, 100), make_event(KEY_UP, 'a', 1, 100.01)]
keyboard.play(events, 1)
self.do([], d_a+u_a)
self.assertGreater(time.time() - last_time, 0.005)
def test_get_typed_strings_simple(self):
events = du_a+du_b+du_backspace+d_shift+du_a+u_shift+du_space+du_ctrl+du_a
self.assertEqual(list(keyboard.get_typed_strings(events)), ['aA ', 'a'])
def test_get_typed_strings_backspace(self):
events = du_a+du_b+du_backspace
self.assertEqual(list(keyboard.get_typed_strings(events)), ['a'])
events = du_backspace+du_a+du_b
self.assertEqual(list(keyboard.get_typed_strings(events)), ['ab'])
def test_get_typed_strings_shift(self):
events = d_shift+du_a+du_b+u_shift+du_space+du_ctrl+du_a
self.assertEqual(list(keyboard.get_typed_strings(events)), ['AB ', 'a'])
def test_get_typed_strings_all(self):
events = du_a+du_b+du_backspace+d_shift+du_a+du_capslock+du_b+u_shift+du_space+du_ctrl+du_a
self.assertEqual(list(keyboard.get_typed_strings(events)), ['aAb ', 'A'])
def test_get_hotkey_name_simple(self):
self.assertEqual(keyboard.get_hotkey_name(['a']), 'a')
def test_get_hotkey_name_modifiers(self):
self.assertEqual(keyboard.get_hotkey_name(['a', 'shift', 'ctrl']), 'ctrl+shift+a')
def test_get_hotkey_name_normalize(self):
self.assertEqual(keyboard.get_hotkey_name(['SHIFT', 'left ctrl']), 'ctrl+shift')
def test_get_hotkey_name_plus(self):
self.assertEqual(keyboard.get_hotkey_name(['+']), 'plus')
def test_get_hotkey_name_duplicated(self):
self.assertEqual(keyboard.get_hotkey_name(['+', 'plus']), 'plus')
def test_get_hotkey_name_full(self):
self.assertEqual(keyboard.get_hotkey_name(['+', 'left ctrl', 'shift', 'WIN', 'right alt']), 'ctrl+alt+shift+windows+plus')
def test_get_hotkey_name_multiple(self):
self.assertEqual(keyboard.get_hotkey_name(['ctrl', 'b', '!', 'a']), 'ctrl+!+a+b')
def test_get_hotkey_name_from_pressed(self):
self.do(du_c+d_ctrl+d_a+d_b)
self.assertEqual(keyboard.get_hotkey_name(), 'ctrl+a+b')
def test_read_hotkey(self):
queue = keyboard._queue.Queue()
def process():
queue.put(keyboard.read_hotkey())
from threading import Thread
t = Thread(target=process)
t.daemon = True
t.start()
time.sleep(0.01)
self.do(d_ctrl+d_a+d_b+u_ctrl)
self.assertEqual(queue.get(timeout=0.5), 'ctrl+a+b')
def test_read_event(self):
queue = keyboard._queue.Queue()
def process():
queue.put(keyboard.read_event(suppress=True))
from threading import Thread
t = Thread(target=process)
t.daemon = True
t.start()
time.sleep(0.01)
self.do(d_a, [])
self.assertEqual(queue.get(timeout=0.5), d_a[0])
def test_read_key(self):
queue = keyboard._queue.Queue()
def process():
queue.put(keyboard.read_key(suppress=True))
from threading import Thread
t = Thread(target=process)
t.daemon = True
t.start()
time.sleep(0.01)
self.do(d_a, [])
self.assertEqual(queue.get(timeout=0.5), 'a')
def test_wait_infinite(self):
self.triggered = False
def process():
keyboard.wait()
self.triggered = True
from threading import Thread
t = Thread(target=process)
t.daemon = True # Yep, we are letting this thread loose.
t.start()
time.sleep(0.01)
self.assertFalse(self.triggered)
def test_wait_until_success(self):
queue = keyboard._queue.Queue()
def process():
queue.put(keyboard.wait(queue.get(timeout=0.5), suppress=True) or True)
from threading import Thread
t = Thread(target=process)
t.daemon = True
t.start()
queue.put('a')
time.sleep(0.01)
self.do(d_a, [])
self.assertTrue(queue.get(timeout=0.5))
def test_wait_until_fail(self):
def process():
keyboard.wait('a', suppress=True)
self.fail()
from threading import Thread
t = Thread(target=process)
t.daemon = True # Yep, we are letting this thread loose.
t.start()
time.sleep(0.01)
self.do(d_b)
def test_add_hotkey_single_step_suppress_allow(self):
keyboard.add_hotkey('a', lambda: trigger() or True, suppress=True)
self.do(d_a, triggered_event+d_a)
def test_add_hotkey_single_step_suppress_args_allow(self):
arg = object()
keyboard.add_hotkey('a', lambda a: self.assertIs(a, arg) or trigger() or True, args=(arg,), suppress=True)
self.do(d_a, triggered_event+d_a)
def test_add_hotkey_single_step_suppress_single(self):
keyboard.add_hotkey('a', trigger, suppress=True)
self.do(d_a, triggered_event)
def test_add_hotkey_single_step_suppress_removed(self):
keyboard.remove_hotkey(keyboard.add_hotkey('a', trigger, suppress=True))
self.do(d_a, d_a)
def test_add_hotkey_single_step_suppress_removed(self):
keyboard.remove_hotkey(keyboard.add_hotkey('ctrl+a', trigger, suppress=True))
self.do(d_ctrl+d_a, d_ctrl+d_a)
self.assertEqual(keyboard._listener.filtered_modifiers[dummy_keys['left ctrl'][0][0]], 0)
def test_remove_hotkey_internal(self):
remove = keyboard.add_hotkey('shift+a', trigger, suppress=True)
self.assertTrue(all(keyboard._listener.blocking_hotkeys.values()))
self.assertTrue(all(keyboard._listener.filtered_modifiers.values()))
self.assertNotEqual(keyboard._hotkeys, {})
remove()
self.assertTrue(not any(keyboard._listener.filtered_modifiers.values()))
self.assertTrue(not any(keyboard._listener.blocking_hotkeys.values()))
self.assertEqual(keyboard._hotkeys, {})
def test_remove_hotkey_internal_multistep_start(self):
remove = keyboard.add_hotkey('shift+a, b', trigger, suppress=True)
self.assertTrue(all(keyboard._listener.blocking_hotkeys.values()))
self.assertTrue(all(keyboard._listener.filtered_modifiers.values()))
self.assertNotEqual(keyboard._hotkeys, {})
remove()
self.assertTrue(not any(keyboard._listener.filtered_modifiers.values()))
self.assertTrue(not any(keyboard._listener.blocking_hotkeys.values()))
self.assertEqual(keyboard._hotkeys, {})
def test_remove_hotkey_internal_multistep_end(self):
remove = keyboard.add_hotkey('shift+a, b', trigger, suppress=True)
self.do(d_shift+du_a+u_shift)
self.assertTrue(any(keyboard._listener.blocking_hotkeys.values()))
self.assertTrue(not any(keyboard._listener.filtered_modifiers.values()))
self.assertNotEqual(keyboard._hotkeys, {})
remove()
self.assertTrue(not any(keyboard._listener.filtered_modifiers.values()))
self.assertTrue(not any(keyboard._listener.blocking_hotkeys.values()))
self.assertEqual(keyboard._hotkeys, {})
def test_add_hotkey_single_step_suppress_with_modifiers(self):
keyboard.add_hotkey('ctrl+shift+a', trigger, suppress=True)
self.do(d_ctrl+d_shift+d_a, triggered_event)
def test_add_hotkey_single_step_suppress_with_modifiers_fail_unrelated_modifier(self):
keyboard.add_hotkey('ctrl+shift+a', trigger, suppress=True)
self.do(d_ctrl+d_shift+u_shift+d_a, d_shift+u_shift+d_ctrl+d_a)
def test_add_hotkey_single_step_suppress_with_modifiers_fail_unrelated_key(self):
keyboard.add_hotkey('ctrl+shift+a', trigger, suppress=True)
self.do(d_ctrl+d_shift+du_b, d_shift+d_ctrl+du_b)
def test_add_hotkey_single_step_suppress_with_modifiers_unrelated_key(self):
keyboard.add_hotkey('ctrl+shift+a', trigger, suppress=True)
self.do(d_ctrl+d_shift+du_b+d_a, d_shift+d_ctrl+du_b+triggered_event)
def test_add_hotkey_single_step_suppress_with_modifiers_release(self):
keyboard.add_hotkey('ctrl+shift+a', trigger, suppress=True)
self.do(d_ctrl+d_shift+du_b+d_a+u_ctrl+u_shift, d_shift+d_ctrl+du_b+triggered_event+u_ctrl+u_shift)
def test_add_hotkey_single_step_suppress_with_modifiers_out_of_order(self):
keyboard.add_hotkey('ctrl+shift+a', trigger, suppress=True)
self.do(d_shift+d_ctrl+d_a, triggered_event)
def test_add_hotkey_single_step_suppress_with_modifiers_repeated(self):
keyboard.add_hotkey('ctrl+a', trigger, suppress=True)
self.do(d_ctrl+du_a+du_b+du_a, triggered_event+d_ctrl+du_b+triggered_event)
def test_add_hotkey_single_step_suppress_with_modifiers_release(self):
keyboard.add_hotkey('ctrl+a', trigger, suppress=True, trigger_on_release=True)
self.do(d_ctrl+du_a+du_b+du_a, triggered_event+d_ctrl+du_b+triggered_event)
def test_add_hotkey_single_step_suppress_with_modifier_superset_release(self):
keyboard.add_hotkey('ctrl+a', trigger, suppress=True, trigger_on_release=True)
self.do(d_ctrl+d_shift+du_a+u_shift+u_ctrl, d_ctrl+d_shift+du_a+u_shift+u_ctrl)
def test_add_hotkey_single_step_suppress_with_modifier_superset(self):
keyboard.add_hotkey('ctrl+a', trigger, suppress=True)
self.do(d_ctrl+d_shift+du_a+u_shift+u_ctrl, d_ctrl+d_shift+du_a+u_shift+u_ctrl)
def test_add_hotkey_single_step_timeout(self):
keyboard.add_hotkey('a', trigger, timeout=1, suppress=True)
self.do(du_a, triggered_event)
def test_add_hotkey_multi_step_first_timeout(self):
keyboard.add_hotkey('a, b', trigger, timeout=0.01, suppress=True)
time.sleep(0.03)
self.do(du_a+du_b, triggered_event)
def test_add_hotkey_multi_step_last_timeout(self):
keyboard.add_hotkey('a, b', trigger, timeout=0.01, suppress=True)
self.do(du_a, [])
time.sleep(0.05)
self.do(du_b, du_a+du_b)
def test_add_hotkey_multi_step_success_timeout(self):
keyboard.add_hotkey('a, b', trigger, timeout=0.05, suppress=True)
self.do(du_a, [])
time.sleep(0.01)
self.do(du_b, triggered_event)
def test_add_hotkey_multi_step_suffix_timeout(self):
keyboard.add_hotkey('a, b, a', trigger, timeout=0.01, suppress=True)
self.do(du_a+du_b, [])
time.sleep(0.05)
self.do(du_a, du_a+du_b)
self.do(du_b+du_a, triggered_event)
def test_add_hotkey_multi_step_allow(self):
keyboard.add_hotkey('a, b', lambda: trigger() or True, suppress=True)
self.do(du_a+du_b, triggered_event+du_a+du_b)
def test_add_hotkey_single_step_nonsuppress(self):
queue = keyboard._queue.Queue()
keyboard.add_hotkey('ctrl+shift+a+b', lambda: queue.put(True), suppress=False)
self.do(d_shift+d_ctrl+d_a+d_b)
self.assertTrue(queue.get(timeout=0.5))
def test_add_hotkey_single_step_nonsuppress_repeated(self):
queue = keyboard._queue.Queue()
keyboard.add_hotkey('ctrl+shift+a+b', lambda: queue.put(True), suppress=False)
self.do(d_shift+d_ctrl+d_a+d_b)
self.do(d_shift+d_ctrl+d_a+d_b)
self.assertTrue(queue.get(timeout=0.5))
self.assertTrue(queue.get(timeout=0.5))
def test_add_hotkey_single_step_nosuppress_with_modifiers_out_of_order(self):
queue = keyboard._queue.Queue()
keyboard.add_hotkey('ctrl+shift+a', lambda: queue.put(True), suppress=False)
self.do(d_shift+d_ctrl+d_a)
self.assertTrue(queue.get(timeout=0.5))
def test_add_hotkey_single_step_suppress_regression_1(self):
keyboard.add_hotkey('a', trigger, suppress=True)
self.do(d_c+d_a+u_c+u_a, d_c+d_a+u_c+u_a)
def test_remap_hotkey_single(self):
keyboard.remap_hotkey('a', 'b')
self.do(d_a+u_a, d_b+u_b)
def test_remap_hotkey_complex_dst(self):
keyboard.remap_hotkey('a', 'ctrl+b, c')
self.do(d_a+u_a, d_ctrl+du_b+u_ctrl+du_c)
def test_remap_hotkey_modifiers(self):
keyboard.remap_hotkey('ctrl+shift+a', 'b')
self.do(d_ctrl+d_shift+d_a+u_a, du_b)
def test_remap_hotkey_modifiers_repeat(self):
keyboard.remap_hotkey('ctrl+shift+a', 'b')
self.do(d_ctrl+d_shift+du_a+du_a, du_b+du_b)
def test_remap_hotkey_modifiers_state(self):
keyboard.remap_hotkey('ctrl+shift+a', 'b')
self.do(d_ctrl+d_shift+du_c+du_a+du_a, d_shift+d_ctrl+du_c+u_shift+u_ctrl+du_b+d_ctrl+d_shift+u_shift+u_ctrl+du_b+d_ctrl+d_shift)
def test_remap_hotkey_release_incomplete(self):
keyboard.remap_hotkey('a', 'b', trigger_on_release=True)
self.do(d_a, [])
def test_remap_hotkey_release_complete(self):
keyboard.remap_hotkey('a', 'b', trigger_on_release=True)
self.do(du_a, du_b)
def test_parse_hotkey_combinations_scan_code(self):
self.assertEqual(keyboard.parse_hotkey_combinations(30), (((30,),),))
def test_parse_hotkey_combinations_single(self):
self.assertEqual(keyboard.parse_hotkey_combinations('a'), (((1,),),))
def test_parse_hotkey_combinations_single_modifier(self):
self.assertEqual(keyboard.parse_hotkey_combinations('shift+a'), (((1, 5), (1, 6)),))
def test_parse_hotkey_combinations_single_modifiers(self):
self.assertEqual(keyboard.parse_hotkey_combinations('shift+ctrl+a'), (((1, 5, 7), (1, 6, 7)),))
def test_parse_hotkey_combinations_multi(self):
self.assertEqual(keyboard.parse_hotkey_combinations('a, b'), (((1,),), ((2,),)))
def test_parse_hotkey_combinations_multi_modifier(self):
self.assertEqual(keyboard.parse_hotkey_combinations('shift+a, b'), (((1, 5), (1, 6)), ((2,),)))
def test_parse_hotkey_combinations_list_list(self):
self.assertEqual(keyboard.parse_hotkey_combinations(keyboard.parse_hotkey_combinations('a, b')), keyboard.parse_hotkey_combinations('a, b'))
def test_parse_hotkey_combinations_fail_empty(self):
with self.assertRaises(ValueError):
keyboard.parse_hotkey_combinations('')
def test_add_hotkey_multistep_suppress_incomplete(self):
keyboard.add_hotkey('a, b', trigger, suppress=True)
self.do(du_a, [])
self.assertEqual(keyboard._listener.blocking_hotkeys[(1,)], [])
self.assertEqual(len(keyboard._listener.blocking_hotkeys[(2,)]), 1)
def test_add_hotkey_multistep_suppress_incomplete(self):
keyboard.add_hotkey('a, b', trigger, suppress=True)
self.do(du_a+du_b, triggered_event)
def test_add_hotkey_multistep_suppress_modifier(self):
keyboard.add_hotkey('shift+a, b', trigger, suppress=True)
self.do(d_shift+du_a+u_shift+du_b, triggered_event)
def test_add_hotkey_multistep_suppress_fail(self):
keyboard.add_hotkey('a, b', trigger, suppress=True)
self.do(du_a+du_c, du_a+du_c)
def test_add_hotkey_multistep_suppress_three_steps(self):
keyboard.add_hotkey('a, b, c', trigger, suppress=True)
self.do(du_a+du_b+du_c, triggered_event)
def test_add_hotkey_multistep_suppress_repeated_prefix(self):
keyboard.add_hotkey('a, a, c', trigger, suppress=True, trigger_on_release=True)
self.do(du_a+du_a+du_c, triggered_event)
def test_add_hotkey_multistep_suppress_repeated_key(self):
keyboard.add_hotkey('a, b', trigger, suppress=True)
self.do(du_a+du_a+du_b, du_a+triggered_event)
self.assertEqual(keyboard._listener.blocking_hotkeys[(2,)], [])
self.assertEqual(len(keyboard._listener.blocking_hotkeys[(1,)]), 1)
def test_add_hotkey_multi_step_suppress_regression_1(self):
keyboard.add_hotkey('a, b', trigger, suppress=True)
self.do(d_c+d_a+u_c+u_a+du_c, d_c+d_a+u_c+u_a+du_c)
def test_add_hotkey_multi_step_suppress_replays(self):
keyboard.add_hotkey('a, b, c', trigger, suppress=True)
self.do(du_a+du_b+du_a+du_b+du_space, du_a+du_b+du_a+du_b+du_space)
def test_add_word_listener_success(self):
queue = keyboard._queue.Queue()
def free():
queue.put(1)
keyboard.add_word_listener('abc', free)
self.do(du_a+du_b+du_c+du_space)
self.assertTrue(queue.get(timeout=0.5))
def test_add_word_listener_no_trigger_fail(self):
queue = keyboard._queue.Queue()
def free():
queue.put(1)
keyboard.add_word_listener('abc', free)
self.do(du_a+du_b+du_c)
with self.assertRaises(keyboard._queue.Empty):
queue.get(timeout=0.01)
def test_add_word_listener_timeout_fail(self):
queue = keyboard._queue.Queue()
def free():
queue.put(1)
keyboard.add_word_listener('abc', free, timeout=1)
self.do(du_a+du_b+du_c+[make_event(KEY_DOWN, name='space', time=2)])
with self.assertRaises(keyboard._queue.Empty):
queue.get(timeout=0.01)
def test_duplicated_word_listener(self):
keyboard.add_word_listener('abc', trigger)
keyboard.add_word_listener('abc', trigger)
def test_add_word_listener_remove(self):
queue = keyboard._queue.Queue()
def free():
queue.put(1)
keyboard.add_word_listener('abc', free)
keyboard.remove_word_listener('abc')
self.do(du_a+du_b+du_c+du_space)
with self.assertRaises(keyboard._queue.Empty):
queue.get(timeout=0.01)
def test_add_word_listener_suffix_success(self):
queue = keyboard._queue.Queue()
def free():
queue.put(1)
keyboard.add_word_listener('abc', free, match_suffix=True)
self.do(du_a+du_a+du_b+du_c+du_space)
self.assertTrue(queue.get(timeout=0.5))
def test_add_word_listener_suffix_fail(self):
queue = keyboard._queue.Queue()
def free():
queue.put(1)
keyboard.add_word_listener('abc', free)
self.do(du_a+du_a+du_b+du_c)
with self.assertRaises(keyboard._queue.Empty):
queue.get(timeout=0.01)
#def test_add_abbreviation(self):
# keyboard.add_abbreviation('abc', 'aaa')
# self.do(du_a+du_b+du_c+du_space, [])
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
from collections import namedtuple
LEFT = 'left'
RIGHT = 'right'
MIDDLE = 'middle'
WHEEL = 'wheel'
X = 'x'
X2 = 'x2'
UP = 'up'
DOWN = 'down'
DOUBLE = 'double'
VERTICAL = 'vertical'
HORIZONTAL = 'horizontal'
ButtonEvent = namedtuple('ButtonEvent', ['event_type', 'button', 'time'])
WheelEvent = namedtuple('WheelEvent', ['delta', 'time'])
MoveEvent = namedtuple('MoveEvent', ['x', 'y', 'time'])

@ -0,0 +1,271 @@
# -*- coding: utf-8 -*-
import unittest
import time
from ._mouse_event import MoveEvent, ButtonEvent, WheelEvent, LEFT, RIGHT, MIDDLE, X, X2, UP, DOWN, DOUBLE
from keyboard import mouse
class FakeOsMouse(object):
def __init__(self):
self.append = None
self.position = (0, 0)
self.queue = None
self.init = lambda: None
def listen(self, queue):
self.listening = True
self.queue = queue
def press(self, button):
self.append((DOWN, button))
def release(self, button):
self.append((UP, button))
def get_position(self):
return self.position
def move_to(self, x, y):
self.append(('move', (x, y)))
self.position = (x, y)
def wheel(self, delta):
self.append(('wheel', delta))
def move_relative(self, x, y):
self.position = (self.position[0] + x, self.position[1] + y)
class TestMouse(unittest.TestCase):
@staticmethod
def setUpClass():
mouse._os_mouse= FakeOsMouse()
mouse._listener.start_if_necessary()
assert mouse._os_mouse.listening
def setUp(self):
self.events = []
mouse._pressed_events.clear()
mouse._os_mouse.append = self.events.append
def tearDown(self):
mouse.unhook_all()
# Make sure there's no spill over between tests.
self.wait_for_events_queue()
def wait_for_events_queue(self):
mouse._listener.queue.join()
def flush_events(self):
self.wait_for_events_queue()
events = list(self.events)
# Ugly, but requried to work in Python2. Python3 has list.clear
del self.events[:]
return events
def press(self, button=LEFT):
mouse._os_mouse.queue.put(ButtonEvent(DOWN, button, time.time()))
self.wait_for_events_queue()
def release(self, button=LEFT):
mouse._os_mouse.queue.put(ButtonEvent(UP, button, time.time()))
self.wait_for_events_queue()
def double_click(self, button=LEFT):
mouse._os_mouse.queue.put(ButtonEvent(DOUBLE, button, time.time()))
self.wait_for_events_queue()
def click(self, button=LEFT):
self.press(button)
self.release(button)
def wheel(self, delta=1):
mouse._os_mouse.queue.put(WheelEvent(delta, time.time()))
self.wait_for_events_queue()
def move(self, x=0, y=0):
mouse._os_mouse.queue.put(MoveEvent(x, y, time.time()))
self.wait_for_events_queue()
def test_hook(self):
events = []
self.press()
mouse.hook(events.append)
self.press()
mouse.unhook(events.append)
self.press()
self.assertEqual(len(events), 1)
def test_is_pressed(self):
self.assertFalse(mouse.is_pressed())
self.press()
self.assertTrue(mouse.is_pressed())
self.release()
self.press(X2)
self.assertFalse(mouse.is_pressed())
self.assertTrue(mouse.is_pressed(X2))
self.press(X2)
self.assertTrue(mouse.is_pressed(X2))
self.release(X2)
self.release(X2)
self.assertFalse(mouse.is_pressed(X2))
def test_buttons(self):
mouse.press()
self.assertEqual(self.flush_events(), [(DOWN, LEFT)])
mouse.release()
self.assertEqual(self.flush_events(), [(UP, LEFT)])
mouse.click()
self.assertEqual(self.flush_events(), [(DOWN, LEFT), (UP, LEFT)])
mouse.double_click()
self.assertEqual(self.flush_events(), [(DOWN, LEFT), (UP, LEFT), (DOWN, LEFT), (UP, LEFT)])
mouse.right_click()
self.assertEqual(self.flush_events(), [(DOWN, RIGHT), (UP, RIGHT)])
mouse.click(RIGHT)
self.assertEqual(self.flush_events(), [(DOWN, RIGHT), (UP, RIGHT)])
mouse.press(X2)
self.assertEqual(self.flush_events(), [(DOWN, X2)])
def test_position(self):
self.assertEqual(mouse.get_position(), mouse._os_mouse.get_position())
def test_move(self):
mouse.move(0, 0)
self.assertEqual(mouse._os_mouse.get_position(), (0, 0))
mouse.move(100, 500)
self.assertEqual(mouse._os_mouse.get_position(), (100, 500))
mouse.move(1, 2, False)
self.assertEqual(mouse._os_mouse.get_position(), (101, 502))
mouse.move(0, 0)
mouse.move(100, 499, True, duration=0.01)
self.assertEqual(mouse._os_mouse.get_position(), (100, 499))
mouse.move(100, 1, False, duration=0.01)
self.assertEqual(mouse._os_mouse.get_position(), (200, 500))
mouse.move(0, 0, False, duration=0.01)
self.assertEqual(mouse._os_mouse.get_position(), (200, 500))
def triggers(self, fn, events, **kwargs):
self.triggered = False
def callback():
self.triggered = True
handler = fn(callback, **kwargs)
for event_type, arg in events:
if event_type == DOWN:
self.press(arg)
elif event_type == UP:
self.release(arg)
elif event_type == DOUBLE:
self.double_click(arg)
elif event_type == 'WHEEL':
self.wheel()
mouse._listener.remove_handler(handler)
return self.triggered
def test_on_button(self):
self.assertTrue(self.triggers(mouse.on_button, [(DOWN, LEFT)]))
self.assertTrue(self.triggers(mouse.on_button, [(DOWN, RIGHT)]))
self.assertTrue(self.triggers(mouse.on_button, [(DOWN, X)]))
self.assertFalse(self.triggers(mouse.on_button, [('WHEEL', '')]))
self.assertFalse(self.triggers(mouse.on_button, [(DOWN, X)], buttons=MIDDLE))
self.assertTrue(self.triggers(mouse.on_button, [(DOWN, MIDDLE)], buttons=MIDDLE))
self.assertTrue(self.triggers(mouse.on_button, [(DOWN, MIDDLE)], buttons=MIDDLE))
self.assertFalse(self.triggers(mouse.on_button, [(DOWN, MIDDLE)], buttons=MIDDLE, types=UP))
self.assertTrue(self.triggers(mouse.on_button, [(UP, MIDDLE)], buttons=MIDDLE, types=UP))
self.assertTrue(self.triggers(mouse.on_button, [(UP, MIDDLE)], buttons=[MIDDLE, LEFT], types=[UP, DOWN]))
self.assertTrue(self.triggers(mouse.on_button, [(DOWN, LEFT)], buttons=[MIDDLE, LEFT], types=[UP, DOWN]))
self.assertFalse(self.triggers(mouse.on_button, [(UP, X)], buttons=[MIDDLE, LEFT], types=[UP, DOWN]))
def test_ons(self):
self.assertTrue(self.triggers(mouse.on_click, [(UP, LEFT)]))
self.assertFalse(self.triggers(mouse.on_click, [(UP, RIGHT)]))
self.assertFalse(self.triggers(mouse.on_click, [(DOWN, LEFT)]))
self.assertFalse(self.triggers(mouse.on_click, [(DOWN, RIGHT)]))
self.assertTrue(self.triggers(mouse.on_double_click, [(DOUBLE, LEFT)]))
self.assertFalse(self.triggers(mouse.on_double_click, [(DOUBLE, RIGHT)]))
self.assertFalse(self.triggers(mouse.on_double_click, [(DOWN, RIGHT)]))
self.assertTrue(self.triggers(mouse.on_right_click, [(UP, RIGHT)]))
self.assertTrue(self.triggers(mouse.on_middle_click, [(UP, MIDDLE)]))
def test_wait(self):
# If this fails it blocks. Unfortunately, but I see no other way of testing.
from threading import Thread, Lock
lock = Lock()
lock.acquire()
def t():
mouse.wait()
lock.release()
Thread(target=t).start()
self.press()
lock.acquire()
def test_record_play(self):
from threading import Thread, Lock
lock = Lock()
lock.acquire()
def t():
self.recorded = mouse.record(RIGHT)
lock.release()
Thread(target=t).start()
self.click()
self.wheel(5)
self.move(100, 50)
self.press(RIGHT)
lock.acquire()
self.assertEqual(len(self.recorded), 5)
self.assertEqual(self.recorded[0]._replace(time=None), ButtonEvent(DOWN, LEFT, None))
self.assertEqual(self.recorded[1]._replace(time=None), ButtonEvent(UP, LEFT, None))
self.assertEqual(self.recorded[2]._replace(time=None), WheelEvent(5, None))
self.assertEqual(self.recorded[3]._replace(time=None), MoveEvent(100, 50, None))
self.assertEqual(self.recorded[4]._replace(time=None), ButtonEvent(DOWN, RIGHT, None))
mouse.play(self.recorded, speed_factor=0)
events = self.flush_events()
self.assertEqual(len(events), 5)
self.assertEqual(events[0], (DOWN, LEFT))
self.assertEqual(events[1], (UP, LEFT))
self.assertEqual(events[2], ('wheel', 5))
self.assertEqual(events[3], ('move', (100, 50)))
self.assertEqual(events[4], (DOWN, RIGHT))
mouse.play(self.recorded)
events = self.flush_events()
self.assertEqual(len(events), 5)
self.assertEqual(events[0], (DOWN, LEFT))
self.assertEqual(events[1], (UP, LEFT))
self.assertEqual(events[2], ('wheel', 5))
self.assertEqual(events[3], ('move', (100, 50)))
self.assertEqual(events[4], (DOWN, RIGHT))
mouse.play(self.recorded, include_clicks=False)
events = self.flush_events()
self.assertEqual(len(events), 2)
self.assertEqual(events[0], ('wheel', 5))
self.assertEqual(events[1], ('move', (100, 50)))
mouse.play(self.recorded, include_moves=False)
events = self.flush_events()
self.assertEqual(len(events), 4)
self.assertEqual(events[0], (DOWN, LEFT))
self.assertEqual(events[1], (UP, LEFT))
self.assertEqual(events[2], ('wheel', 5))
self.assertEqual(events[3], (DOWN, RIGHT))
mouse.play(self.recorded, include_wheel=False)
events = self.flush_events()
self.assertEqual(len(events), 4)
self.assertEqual(events[0], (DOWN, LEFT))
self.assertEqual(events[1], (UP, LEFT))
self.assertEqual(events[2], ('move', (100, 50)))
self.assertEqual(events[3], (DOWN, RIGHT))
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,174 @@
# -*- coding: utf-8 -*-
import struct
import os
import atexit
from time import time as now
from threading import Thread
from glob import glob
try:
from queue import Queue
except ImportError:
from Queue import Queue
event_bin_format = 'llHHI'
# Taken from include/linux/input.h
# https://www.kernel.org/doc/Documentation/input/event-codes.txt
EV_SYN = 0x00
EV_KEY = 0x01
EV_REL = 0x02
EV_ABS = 0x03
EV_MSC = 0x04
def make_uinput():
if not os.path.exists('/dev/uinput'):
raise IOError('No uinput module found.')
import fcntl, struct
# Requires uinput driver, but it's usually available.
uinput = open("/dev/uinput", 'wb')
UI_SET_EVBIT = 0x40045564
fcntl.ioctl(uinput, UI_SET_EVBIT, EV_KEY)
UI_SET_KEYBIT = 0x40045565
for i in range(256):
fcntl.ioctl(uinput, UI_SET_KEYBIT, i)
BUS_USB = 0x03
uinput_user_dev = "80sHHHHi64i64i64i64i"
axis = [0] * 64 * 4
uinput.write(struct.pack(uinput_user_dev, b"Virtual Keyboard", BUS_USB, 1, 1, 1, 0, *axis))
uinput.flush() # Without this you may get Errno 22: Invalid argument.
UI_DEV_CREATE = 0x5501
fcntl.ioctl(uinput, UI_DEV_CREATE)
UI_DEV_DESTROY = 0x5502
#fcntl.ioctl(uinput, UI_DEV_DESTROY)
return uinput
class EventDevice(object):
def __init__(self, path):
self.path = path
self._input_file = None
self._output_file = None
@property
def input_file(self):
if self._input_file is None:
try:
self._input_file = open(self.path, 'rb')
except IOError as e:
if e.strerror == 'Permission denied':
print('Permission denied ({}). You must be sudo to access global events.'.format(self.path))
exit()
def try_close():
try:
self._input_file.close
except:
pass
atexit.register(try_close)
return self._input_file
@property
def output_file(self):
if self._output_file is None:
self._output_file = open(self.path, 'wb')
atexit.register(self._output_file.close)
return self._output_file
def read_event(self):
data = self.input_file.read(struct.calcsize(event_bin_format))
seconds, microseconds, type, code, value = struct.unpack(event_bin_format, data)
return seconds + microseconds / 1e6, type, code, value, self.path
def write_event(self, type, code, value):
integer, fraction = divmod(now(), 1)
seconds = int(integer)
microseconds = int(fraction * 1e6)
data_event = struct.pack(event_bin_format, seconds, microseconds, type, code, value)
# Send a sync event to ensure other programs update.
sync_event = struct.pack(event_bin_format, seconds, microseconds, EV_SYN, 0, 0)
self.output_file.write(data_event + sync_event)
self.output_file.flush()
class AggregatedEventDevice(object):
def __init__(self, devices, output=None):
self.event_queue = Queue()
self.devices = devices
self.output = output or self.devices[0]
def start_reading(device):
while True:
self.event_queue.put(device.read_event())
for device in self.devices:
thread = Thread(target=start_reading, args=[device])
thread.setDaemon(True)
thread.start()
def read_event(self):
return self.event_queue.get(block=True)
def write_event(self, type, code, value):
self.output.write_event(type, code, value)
import re
from collections import namedtuple
DeviceDescription = namedtuple('DeviceDescription', 'event_file is_mouse is_keyboard')
device_pattern = r"""N: Name="([^"]+?)".+?H: Handlers=([^\n]+)"""
def list_devices_from_proc(type_name):
try:
with open('/proc/bus/input/devices') as f:
description = f.read()
except FileNotFoundError:
return
devices = {}
for name, handlers in re.findall(device_pattern, description, re.DOTALL):
path = '/dev/input/event' + re.search(r'event(\d+)', handlers).group(1)
if type_name in handlers:
yield EventDevice(path)
def list_devices_from_by_id(name_suffix, by_id=True):
for path in glob('/dev/input/{}/*-event-{}'.format('by-id' if by_id else 'by-path', name_suffix)):
yield EventDevice(path)
def aggregate_devices(type_name):
# Some systems have multiple keyboards with different range of allowed keys
# on each one, like a notebook with a "keyboard" device exclusive for the
# power button. Instead of figuring out which keyboard allows which key to
# send events, we create a fake device and send all events through there.
try:
uinput = make_uinput()
fake_device = EventDevice('uinput Fake Device')
fake_device._input_file = uinput
fake_device._output_file = uinput
except IOError as e:
import warnings
warnings.warn('Failed to create a device file using `uinput` module. Sending of events may be limited or unavailable depending on plugged-in devices.', stacklevel=2)
fake_device = None
# We don't aggregate devices from different sources to avoid
# duplicates.
devices_from_proc = list(list_devices_from_proc(type_name))
if devices_from_proc:
return AggregatedEventDevice(devices_from_proc, output=fake_device)
# breaks on mouse for virtualbox
# was getting /dev/input/by-id/usb-VirtualBox_USB_Tablet-event-mouse
devices_from_by_id = list(list_devices_from_by_id(type_name)) or list(list_devices_from_by_id(type_name, by_id=False))
if devices_from_by_id:
return AggregatedEventDevice(devices_from_by_id, output=fake_device)
# If no keyboards were found we can only use the fake device to send keys.
assert fake_device
return fake_device
def ensure_root():
if os.geteuid() != 0:
raise ImportError('You must be root to use this library on linux.')

@ -0,0 +1,183 @@
# -*- coding: utf-8 -*-
import struct
import traceback
from time import time as now
from collections import namedtuple
from ._keyboard_event import KeyboardEvent, KEY_DOWN, KEY_UP
from ._canonical_names import all_modifiers, normalize_name
from ._nixcommon import EV_KEY, aggregate_devices, ensure_root
# TODO: start by reading current keyboard state, as to not missing any already pressed keys.
# See: http://stackoverflow.com/questions/3649874/how-to-get-keyboard-state-in-linux
def cleanup_key(name):
""" Formats a dumpkeys format to our standard. """
name = name.lstrip('+')
is_keypad = name.startswith('KP_')
for mod in ('Meta_', 'Control_', 'dead_', 'KP_'):
if name.startswith(mod):
name = name[len(mod):]
# Dumpkeys is weird like that.
if name == 'Remove':
name = 'Delete'
elif name == 'Delete':
name = 'Backspace'
if name.endswith('_r'):
name = 'right ' + name[:-2]
if name.endswith('_l'):
name = 'left ' + name[:-2]
return normalize_name(name), is_keypad
def cleanup_modifier(modifier):
modifier = normalize_name(modifier)
if modifier in all_modifiers:
return modifier
if modifier[:-1] in all_modifiers:
return modifier[:-1]
raise ValueError('Unknown modifier {}'.format(modifier))
"""
Use `dumpkeys --keys-only` to list all scan codes and their names. We
then parse the output and built a table. For each scan code and modifiers we
have a list of names and vice-versa.
"""
from subprocess import check_output
from collections import defaultdict
import re
to_name = defaultdict(list)
from_name = defaultdict(list)
keypad_scan_codes = set()
def register_key(key_and_modifiers, name):
if name not in to_name[key_and_modifiers]:
to_name[key_and_modifiers].append(name)
if key_and_modifiers not in from_name[name]:
from_name[name].append(key_and_modifiers)
def build_tables():
if to_name and from_name: return
ensure_root()
modifiers_bits = {
'shift': 1,
'alt gr': 2,
'ctrl': 4,
'alt': 8,
}
keycode_template = r'^keycode\s+(\d+)\s+=(.*?)$'
dump = check_output(['dumpkeys', '--keys-only'], universal_newlines=True)
for str_scan_code, str_names in re.findall(keycode_template, dump, re.MULTILINE):
scan_code = int(str_scan_code)
for i, str_name in enumerate(str_names.strip().split()):
modifiers = tuple(sorted(modifier for modifier, bit in modifiers_bits.items() if i & bit))
name, is_keypad = cleanup_key(str_name)
register_key((scan_code, modifiers), name)
if is_keypad:
keypad_scan_codes.add(scan_code)
register_key((scan_code, modifiers), 'keypad ' + name)
# dumpkeys consistently misreports the Windows key, sometimes
# skipping it completely or reporting as 'alt. 125 = left win,
# 126 = right win.
if (125, ()) not in to_name or to_name[(125, ())] == 'alt':
register_key((125, ()), 'windows')
if (126, ()) not in to_name or to_name[(126, ())] == 'alt':
register_key((126, ()), 'windows')
# The menu key is usually skipped altogether, so we also add it manually.
if (127, ()) not in to_name:
register_key((127, ()), 'menu')
synonyms_template = r'^(\S+)\s+for (.+)$'
dump = check_output(['dumpkeys', '--long-info'], universal_newlines=True)
for synonym_str, original_str in re.findall(synonyms_template, dump, re.MULTILINE):
synonym, _ = cleanup_key(synonym_str)
original, _ = cleanup_key(original_str)
if synonym != original:
from_name[original].extend(from_name[synonym])
from_name[synonym].extend(from_name[original])
device = None
def build_device():
global device
if device: return
ensure_root()
device = aggregate_devices('kbd')
def init():
build_device()
build_tables()
pressed_modifiers = set()
def listen(callback):
build_device()
build_tables()
while True:
time, type, code, value, device_id = device.read_event()
if type != EV_KEY:
continue
scan_code = code
event_type = KEY_DOWN if value else KEY_UP # 0 = UP, 1 = DOWN, 2 = HOLD
pressed_modifiers_tuple = tuple(sorted(pressed_modifiers))
names = to_name[(scan_code, pressed_modifiers_tuple)] or to_name[(scan_code, ())] or ['unknown']
name = names[0]
if name in all_modifiers:
if event_type == KEY_DOWN:
pressed_modifiers.add(name)
else:
pressed_modifiers.discard(name)
is_keypad = scan_code in keypad_scan_codes
callback(KeyboardEvent(event_type=event_type, scan_code=scan_code, name=name, time=time, device=device_id, is_keypad=is_keypad, modifiers=pressed_modifiers_tuple))
def write_event(scan_code, is_down):
build_device()
device.write_event(EV_KEY, scan_code, int(is_down))
def map_name(name):
build_tables()
for entry in from_name[name]:
yield entry
parts = name.split(' ', 1)
if len(parts) > 1 and parts[0] in ('left', 'right'):
for entry in from_name[parts[1]]:
yield entry
def press(scan_code):
write_event(scan_code, True)
def release(scan_code):
write_event(scan_code, False)
def type_unicode(character):
codepoint = ord(character)
hexadecimal = hex(codepoint)[len('0x'):]
for key in ['ctrl', 'shift', 'u']:
scan_code, _ = next(map_name(key))
press(scan_code)
for key in hexadecimal:
scan_code, _ = next(map_name(key))
press(scan_code)
release(scan_code)
for key in ['ctrl', 'shift', 'u']:
scan_code, _ = next(map_name(key))
release(scan_code)
if __name__ == '__main__':
def p(e):
print(e)
listen(p)

@ -0,0 +1,130 @@
# -*- coding: utf-8 -*-
import struct
from subprocess import check_output
import re
from ._nixcommon import EV_KEY, EV_REL, EV_MSC, EV_SYN, EV_ABS, aggregate_devices, ensure_root
from ._mouse_event import ButtonEvent, WheelEvent, MoveEvent, LEFT, RIGHT, MIDDLE, X, X2, UP, DOWN
import ctypes
import ctypes.util
from ctypes import c_uint32, c_uint, c_int, byref
display = None
window = None
x11 = None
def build_display():
global display, window, x11
if display and window and x11: return
x11 = ctypes.cdll.LoadLibrary(ctypes.util.find_library('X11'))
# Required because we will have multiple threads calling x11,
# such as the listener thread and then main using "move_to".
x11.XInitThreads()
display = x11.XOpenDisplay(None)
# Known to cause segfault in Fedora 23 64bits, no known workarounds.
# http://stackoverflow.com/questions/35137007/get-mouse-position-on-linux-pure-python
window = x11.XDefaultRootWindow(display)
def get_position():
build_display()
root_id, child_id = c_uint32(), c_uint32()
root_x, root_y, win_x, win_y = c_int(), c_int(), c_int(), c_int()
mask = c_uint()
ret = x11.XQueryPointer(display, c_uint32(window), byref(root_id), byref(child_id),
byref(root_x), byref(root_y),
byref(win_x), byref(win_y), byref(mask))
return root_x.value, root_y.value
def move_to(x, y):
build_display()
x11.XWarpPointer(display, None, window, 0, 0, 0, 0, x, y)
x11.XFlush(display)
REL_X = 0x00
REL_Y = 0x01
REL_Z = 0x02
REL_HWHEEL = 0x06
REL_WHEEL = 0x08
ABS_X = 0x00
ABS_Y = 0x01
BTN_MOUSE = 0x110
BTN_LEFT = 0x110
BTN_RIGHT = 0x111
BTN_MIDDLE = 0x112
BTN_SIDE = 0x113
BTN_EXTRA = 0x114
button_by_code = {
BTN_LEFT: LEFT,
BTN_RIGHT: RIGHT,
BTN_MIDDLE: MIDDLE,
BTN_SIDE: X,
BTN_EXTRA: X2,
}
code_by_button = {button: code for code, button in button_by_code.items()}
device = None
def build_device():
global device
if device: return
ensure_root()
device = aggregate_devices('mouse')
init = build_device
def listen(queue):
build_device()
while True:
time, type, code, value, device_id = device.read_event()
if type == EV_SYN or type == EV_MSC:
continue
event = None
arg = None
if type == EV_KEY:
event = ButtonEvent(DOWN if value else UP, button_by_code.get(code, '?'), time)
elif type == EV_REL:
value, = struct.unpack('i', struct.pack('I', value))
if code == REL_WHEEL:
event = WheelEvent(value, time)
elif code in (REL_X, REL_Y):
x, y = get_position()
event = MoveEvent(x, y, time)
if event is None:
# Unknown event type.
continue
queue.put(event)
def press(button=LEFT):
build_device()
device.write_event(EV_KEY, code_by_button[button], 0x01)
def release(button=LEFT):
build_device()
device.write_event(EV_KEY, code_by_button[button], 0x00)
def move_relative(x, y):
build_device()
# Note relative events are not in terms of pixels, but millimeters.
if x < 0:
x += 2**32
if y < 0:
y += 2**32
device.write_event(EV_REL, REL_X, x)
device.write_event(EV_REL, REL_Y, y)
def wheel(delta=1):
build_device()
if delta < 0:
delta += 2**32
device.write_event(EV_REL, REL_WHEEL, delta)
if __name__ == '__main__':
#listen(print)
move_to(100, 200)

@ -0,0 +1,620 @@
# -*- coding: utf-8 -*-
"""
This is the Windows backend for keyboard events, and is implemented by
invoking the Win32 API through the ctypes module. This is error prone
and can introduce very unpythonic failure modes, such as segfaults and
low level memory leaks. But it is also dependency-free, very performant
well documented on Microsoft's webstie and scattered examples.
# TODO:
- Keypad numbers still print as numbers even when numlock is off.
- No way to specify if user wants a keypad key or not in `map_char`.
"""
from __future__ import unicode_literals
import re
import atexit
import traceback
from threading import Lock
from collections import defaultdict
from ._keyboard_event import KeyboardEvent, KEY_DOWN, KEY_UP
from ._canonical_names import normalize_name
try:
# Force Python2 to convert to unicode and not to str.
chr = unichr
except NameError:
pass
# This part is just declaring Win32 API structures using ctypes. In C
# this would be simply #include "windows.h".
import ctypes
from ctypes import c_short, c_char, c_uint8, c_int32, c_int, c_uint, c_uint32, c_long, Structure, CFUNCTYPE, POINTER
from ctypes.wintypes import WORD, DWORD, BOOL, HHOOK, MSG, LPWSTR, WCHAR, WPARAM, LPARAM, LONG, HMODULE, LPCWSTR, HINSTANCE, HWND
LPMSG = POINTER(MSG)
ULONG_PTR = POINTER(DWORD)
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
GetModuleHandleW = kernel32.GetModuleHandleW
GetModuleHandleW.restype = HMODULE
GetModuleHandleW.argtypes = [LPCWSTR]
#https://github.com/boppreh/mouse/issues/1
#user32 = ctypes.windll.user32
user32 = ctypes.WinDLL('user32', use_last_error = True)
VK_PACKET = 0xE7
INPUT_MOUSE = 0
INPUT_KEYBOARD = 1
INPUT_HARDWARE = 2
KEYEVENTF_KEYUP = 0x02
KEYEVENTF_UNICODE = 0x04
class KBDLLHOOKSTRUCT(Structure):
_fields_ = [("vk_code", DWORD),
("scan_code", DWORD),
("flags", DWORD),
("time", c_int),
("dwExtraInfo", ULONG_PTR)]
# Included for completeness.
class MOUSEINPUT(ctypes.Structure):
_fields_ = (('dx', LONG),
('dy', LONG),
('mouseData', DWORD),
('dwFlags', DWORD),
('time', DWORD),
('dwExtraInfo', ULONG_PTR))
class KEYBDINPUT(ctypes.Structure):
_fields_ = (('wVk', WORD),
('wScan', WORD),
('dwFlags', DWORD),
('time', DWORD),
('dwExtraInfo', ULONG_PTR))
class HARDWAREINPUT(ctypes.Structure):
_fields_ = (('uMsg', DWORD),
('wParamL', WORD),
('wParamH', WORD))
class _INPUTunion(ctypes.Union):
_fields_ = (('mi', MOUSEINPUT),
('ki', KEYBDINPUT),
('hi', HARDWAREINPUT))
class INPUT(ctypes.Structure):
_fields_ = (('type', DWORD),
('union', _INPUTunion))
LowLevelKeyboardProc = CFUNCTYPE(c_int, WPARAM, LPARAM, POINTER(KBDLLHOOKSTRUCT))
SetWindowsHookEx = user32.SetWindowsHookExW
SetWindowsHookEx.argtypes = [c_int, LowLevelKeyboardProc, HINSTANCE , DWORD]
SetWindowsHookEx.restype = HHOOK
CallNextHookEx = user32.CallNextHookEx
#CallNextHookEx.argtypes = [c_int , c_int, c_int, POINTER(KBDLLHOOKSTRUCT)]
CallNextHookEx.restype = c_int
UnhookWindowsHookEx = user32.UnhookWindowsHookEx
UnhookWindowsHookEx.argtypes = [HHOOK]
UnhookWindowsHookEx.restype = BOOL
GetMessage = user32.GetMessageW
GetMessage.argtypes = [LPMSG, HWND, c_uint, c_uint]
GetMessage.restype = BOOL
TranslateMessage = user32.TranslateMessage
TranslateMessage.argtypes = [LPMSG]
TranslateMessage.restype = BOOL
DispatchMessage = user32.DispatchMessageA
DispatchMessage.argtypes = [LPMSG]
keyboard_state_type = c_uint8 * 256
GetKeyboardState = user32.GetKeyboardState
GetKeyboardState.argtypes = [keyboard_state_type]
GetKeyboardState.restype = BOOL
GetKeyNameText = user32.GetKeyNameTextW
GetKeyNameText.argtypes = [c_long, LPWSTR, c_int]
GetKeyNameText.restype = c_int
MapVirtualKey = user32.MapVirtualKeyW
MapVirtualKey.argtypes = [c_uint, c_uint]
MapVirtualKey.restype = c_uint
ToUnicode = user32.ToUnicode
ToUnicode.argtypes = [c_uint, c_uint, keyboard_state_type, LPWSTR, c_int, c_uint]
ToUnicode.restype = c_int
SendInput = user32.SendInput
SendInput.argtypes = [c_uint, POINTER(INPUT), c_int]
SendInput.restype = c_uint
# https://msdn.microsoft.com/en-us/library/windows/desktop/ms646307(v=vs.85).aspx
MAPVK_VK_TO_CHAR = 2
MAPVK_VK_TO_VSC = 0
MAPVK_VSC_TO_VK = 1
MAPVK_VK_TO_VSC_EX = 4
MAPVK_VSC_TO_VK_EX = 3
VkKeyScan = user32.VkKeyScanW
VkKeyScan.argtypes = [WCHAR]
VkKeyScan.restype = c_short
LLKHF_INJECTED = 0x00000010
WM_KEYDOWN = 0x0100
WM_KEYUP = 0x0101
WM_SYSKEYDOWN = 0x104 # Used for ALT key
WM_SYSKEYUP = 0x105
# This marks the end of Win32 API declarations. The rest is ours.
keyboard_event_types = {
WM_KEYDOWN: KEY_DOWN,
WM_KEYUP: KEY_UP,
WM_SYSKEYDOWN: KEY_DOWN,
WM_SYSKEYUP: KEY_UP,
}
# List taken from the official documentation, but stripped of the OEM-specific keys.
# Keys are virtual key codes, values are pairs (name, is_keypad).
official_virtual_keys = {
0x03: ('control-break processing', False),
0x08: ('backspace', False),
0x09: ('tab', False),
0x0c: ('clear', False),
0x0d: ('enter', False),
0x10: ('shift', False),
0x11: ('ctrl', False),
0x12: ('alt', False),
0x13: ('pause', False),
0x14: ('caps lock', False),
0x15: ('ime kana mode', False),
0x15: ('ime hanguel mode', False),
0x15: ('ime hangul mode', False),
0x17: ('ime junja mode', False),
0x18: ('ime final mode', False),
0x19: ('ime hanja mode', False),
0x19: ('ime kanji mode', False),
0x1b: ('esc', False),
0x1c: ('ime convert', False),
0x1d: ('ime nonconvert', False),
0x1e: ('ime accept', False),
0x1f: ('ime mode change request', False),
0x20: ('spacebar', False),
0x21: ('page up', False),
0x22: ('page down', False),
0x23: ('end', False),
0x24: ('home', False),
0x25: ('left', False),
0x26: ('up', False),
0x27: ('right', False),
0x28: ('down', False),
0x29: ('select', False),
0x2a: ('print', False),
0x2b: ('execute', False),
0x2c: ('print screen', False),
0x2d: ('insert', False),
0x2e: ('delete', False),
0x2f: ('help', False),
0x30: ('0', False),
0x31: ('1', False),
0x32: ('2', False),
0x33: ('3', False),
0x34: ('4', False),
0x35: ('5', False),
0x36: ('6', False),
0x37: ('7', False),
0x38: ('8', False),
0x39: ('9', False),
0x41: ('a', False),
0x42: ('b', False),
0x43: ('c', False),
0x44: ('d', False),
0x45: ('e', False),
0x46: ('f', False),
0x47: ('g', False),
0x48: ('h', False),
0x49: ('i', False),
0x4a: ('j', False),
0x4b: ('k', False),
0x4c: ('l', False),
0x4d: ('m', False),
0x4e: ('n', False),
0x4f: ('o', False),
0x50: ('p', False),
0x51: ('q', False),
0x52: ('r', False),
0x53: ('s', False),
0x54: ('t', False),
0x55: ('u', False),
0x56: ('v', False),
0x57: ('w', False),
0x58: ('x', False),
0x59: ('y', False),
0x5a: ('z', False),
0x5b: ('left windows', False),
0x5c: ('right windows', False),
0x5d: ('applications', False),
0x5f: ('sleep', False),
0x60: ('0', True),
0x61: ('1', True),
0x62: ('2', True),
0x63: ('3', True),
0x64: ('4', True),
0x65: ('5', True),
0x66: ('6', True),
0x67: ('7', True),
0x68: ('8', True),
0x69: ('9', True),
0x6a: ('*', True),
0x6b: ('+', True),
0x6c: ('separator', True),
0x6d: ('-', True),
0x6e: ('decimal', True),
0x6f: ('/', True),
0x70: ('f1', False),
0x71: ('f2', False),
0x72: ('f3', False),
0x73: ('f4', False),
0x74: ('f5', False),
0x75: ('f6', False),
0x76: ('f7', False),
0x77: ('f8', False),
0x78: ('f9', False),
0x79: ('f10', False),
0x7a: ('f11', False),
0x7b: ('f12', False),
0x7c: ('f13', False),
0x7d: ('f14', False),
0x7e: ('f15', False),
0x7f: ('f16', False),
0x80: ('f17', False),
0x81: ('f18', False),
0x82: ('f19', False),
0x83: ('f20', False),
0x84: ('f21', False),
0x85: ('f22', False),
0x86: ('f23', False),
0x87: ('f24', False),
0x90: ('num lock', False),
0x91: ('scroll lock', False),
0xa0: ('left shift', False),
0xa1: ('right shift', False),
0xa2: ('left ctrl', False),
0xa3: ('right ctrl', False),
0xa4: ('left menu', False),
0xa5: ('right menu', False),
0xa6: ('browser back', False),
0xa7: ('browser forward', False),
0xa8: ('browser refresh', False),
0xa9: ('browser stop', False),
0xaa: ('browser search key', False),
0xab: ('browser favorites', False),
0xac: ('browser start and home', False),
0xad: ('volume mute', False),
0xae: ('volume down', False),
0xaf: ('volume up', False),
0xb0: ('next track', False),
0xb1: ('previous track', False),
0xb2: ('stop media', False),
0xb3: ('play/pause media', False),
0xb4: ('start mail', False),
0xb5: ('select media', False),
0xb6: ('start application 1', False),
0xb7: ('start application 2', False),
0xbb: ('+', False),
0xbc: (',', False),
0xbd: ('-', False),
0xbe: ('.', False),
#0xbe:('/', False), # Used for miscellaneous characters; it can vary by keyboard. For the US standard keyboard, the '/?.
0xe5: ('ime process', False),
0xf6: ('attn', False),
0xf7: ('crsel', False),
0xf8: ('exsel', False),
0xf9: ('erase eof', False),
0xfa: ('play', False),
0xfb: ('zoom', False),
0xfc: ('reserved ', False),
0xfd: ('pa1', False),
0xfe: ('clear', False),
}
tables_lock = Lock()
to_name = defaultdict(list)
from_name = defaultdict(list)
scan_code_to_vk = {}
distinct_modifiers = [
(),
('shift',),
('alt gr',),
('num lock',),
('shift', 'num lock'),
('caps lock',),
('shift', 'caps lock'),
('alt gr', 'num lock'),
]
name_buffer = ctypes.create_unicode_buffer(32)
unicode_buffer = ctypes.create_unicode_buffer(32)
keyboard_state = keyboard_state_type()
def get_event_names(scan_code, vk, is_extended, modifiers):
is_keypad = (scan_code, vk, is_extended) in keypad_keys
is_official = vk in official_virtual_keys
if is_keypad and is_official:
yield official_virtual_keys[vk][0]
keyboard_state[0x10] = 0x80 * ('shift' in modifiers)
keyboard_state[0x11] = 0x80 * ('alt gr' in modifiers)
keyboard_state[0x12] = 0x80 * ('alt gr' in modifiers)
keyboard_state[0x14] = 0x01 * ('caps lock' in modifiers)
keyboard_state[0x90] = 0x01 * ('num lock' in modifiers)
keyboard_state[0x91] = 0x01 * ('scroll lock' in modifiers)
unicode_ret = ToUnicode(vk, scan_code, keyboard_state, unicode_buffer, len(unicode_buffer), 0)
if unicode_ret and unicode_buffer.value:
yield unicode_buffer.value
# unicode_ret == -1 -> is dead key
# ToUnicode has the side effect of setting global flags for dead keys.
# Therefore we need to call it twice to clear those flags.
# If your 6 and 7 keys are named "^6" and "^7", this is the reason.
ToUnicode(vk, scan_code, keyboard_state, unicode_buffer, len(unicode_buffer), 0)
name_ret = GetKeyNameText(scan_code << 16 | is_extended << 24, name_buffer, 1024)
if name_ret and name_buffer.value:
yield name_buffer.value
char = user32.MapVirtualKeyW(vk, MAPVK_VK_TO_CHAR) & 0xFF
if char != 0:
yield chr(char)
if not is_keypad and is_official:
yield official_virtual_keys[vk][0]
def _setup_name_tables():
"""
Ensures the scan code/virtual key code/name translation tables are
filled.
"""
with tables_lock:
if to_name: return
# Go through every possible scan code, and map them to virtual key codes.
# Then vice-versa.
all_scan_codes = [(sc, user32.MapVirtualKeyExW(sc, MAPVK_VSC_TO_VK_EX, 0)) for sc in range(0x100)]
all_vks = [(user32.MapVirtualKeyExW(vk, MAPVK_VK_TO_VSC_EX, 0), vk) for vk in range(0x100)]
for scan_code, vk in all_scan_codes + all_vks:
# `to_name` and `from_name` entries will be a tuple (scan_code, vk, extended, shift_state).
if (scan_code, vk, 0, 0, 0) in to_name:
continue
if scan_code not in scan_code_to_vk:
scan_code_to_vk[scan_code] = vk
# Brute force all combinations to find all possible names.
for extended in [0, 1]:
for modifiers in distinct_modifiers:
entry = (scan_code, vk, extended, modifiers)
# Get key names from ToUnicode, GetKeyNameText, MapVirtualKeyW and official virtual keys.
names = list(get_event_names(*entry))
if names:
# Also map lowercased key names, but only after the properly cased ones.
lowercase_names = [name.lower() for name in names]
to_name[entry] = names + lowercase_names
# Remember the "id" of the name, as the first techniques
# have better results and therefore priority.
for i, name in enumerate(map(normalize_name, names + lowercase_names)):
from_name[name].append((i, entry))
# TODO: single quotes on US INTL is returning the dead key (?), and therefore
# not typing properly.
# Alt gr is way outside the usual range of keys (0..127) and on my
# computer is named as 'ctrl'. Therefore we add it manually and hope
# Windows is consistent in its inconsistency.
for extended in [0, 1]:
for modifiers in distinct_modifiers:
to_name[(541, 162, extended, modifiers)] = ['alt gr']
from_name['alt gr'].append((1, (541, 162, extended, modifiers)))
modifiers_preference = defaultdict(lambda: 10)
modifiers_preference.update({(): 0, ('shift',): 1, ('alt gr',): 2, ('ctrl',): 3, ('alt',): 4})
def order_key(line):
i, entry = line
scan_code, vk, extended, modifiers = entry
return modifiers_preference[modifiers], i, extended, vk, scan_code
for name, entries in list(from_name.items()):
from_name[name] = sorted(set(entries), key=order_key)
# Called by keyboard/__init__.py
init = _setup_name_tables
# List created manually.
keypad_keys = [
# (scan_code, virtual_key_code, is_extended)
(126, 194, 0),
(126, 194, 0),
(28, 13, 1),
(28, 13, 1),
(53, 111, 1),
(53, 111, 1),
(55, 106, 0),
(55, 106, 0),
(69, 144, 1),
(69, 144, 1),
(71, 103, 0),
(71, 36, 0),
(72, 104, 0),
(72, 38, 0),
(73, 105, 0),
(73, 33, 0),
(74, 109, 0),
(74, 109, 0),
(75, 100, 0),
(75, 37, 0),
(76, 101, 0),
(76, 12, 0),
(77, 102, 0),
(77, 39, 0),
(78, 107, 0),
(78, 107, 0),
(79, 35, 0),
(79, 97, 0),
(80, 40, 0),
(80, 98, 0),
(81, 34, 0),
(81, 99, 0),
(82, 45, 0),
(82, 96, 0),
(83, 110, 0),
(83, 46, 0),
]
shift_is_pressed = False
altgr_is_pressed = False
ignore_next_right_alt = False
shift_vks = set([0x10, 0xa0, 0xa1])
def prepare_intercept(callback):
"""
Registers a Windows low level keyboard hook. The provided callback will
be invoked for each high-level keyboard event, and is expected to return
True if the key event should be passed to the next program, or False if
the event is to be blocked.
No event is processed until the Windows messages are pumped (see
start_intercept).
"""
_setup_name_tables()
def process_key(event_type, vk, scan_code, is_extended):
global shift_is_pressed, altgr_is_pressed, ignore_next_right_alt
#print(event_type, vk, scan_code, is_extended)
# Pressing alt-gr also generates an extra "right alt" event
if vk == 0xA5 and ignore_next_right_alt:
ignore_next_right_alt = False
return True
modifiers = (
('shift',) * shift_is_pressed +
('alt gr',) * altgr_is_pressed +
('num lock',) * (user32.GetKeyState(0x90) & 1) +
('caps lock',) * (user32.GetKeyState(0x14) & 1) +
('scroll lock',) * (user32.GetKeyState(0x91) & 1)
)
entry = (scan_code, vk, is_extended, modifiers)
if entry not in to_name:
to_name[entry] = list(get_event_names(*entry))
names = to_name[entry]
name = names[0] if names else None
# TODO: inaccurate when holding multiple different shifts.
if vk in shift_vks:
shift_is_pressed = event_type == KEY_DOWN
if scan_code == 541 and vk == 162:
ignore_next_right_alt = True
altgr_is_pressed = event_type == KEY_DOWN
is_keypad = (scan_code, vk, is_extended) in keypad_keys
return callback(KeyboardEvent(event_type=event_type, scan_code=scan_code or -vk, name=name, is_keypad=is_keypad))
def low_level_keyboard_handler(nCode, wParam, lParam):
try:
vk = lParam.contents.vk_code
# Ignore the second `alt` DOWN observed in some cases.
fake_alt = (LLKHF_INJECTED | 0x20)
# Ignore events generated by SendInput with Unicode.
if vk != VK_PACKET and lParam.contents.flags & fake_alt != fake_alt:
event_type = keyboard_event_types[wParam]
is_extended = lParam.contents.flags & 1
scan_code = lParam.contents.scan_code
should_continue = process_key(event_type, vk, scan_code, is_extended)
if not should_continue:
return -1
except Exception as e:
print('Error in keyboard hook:')
traceback.print_exc()
return CallNextHookEx(None, nCode, wParam, lParam)
WH_KEYBOARD_LL = c_int(13)
keyboard_callback = LowLevelKeyboardProc(low_level_keyboard_handler)
handle = GetModuleHandleW(None)
thread_id = DWORD(0)
keyboard_hook = SetWindowsHookEx(WH_KEYBOARD_LL, keyboard_callback, handle, thread_id)
# Register to remove the hook when the interpreter exits. Unfortunately a
# try/finally block doesn't seem to work here.
atexit.register(UnhookWindowsHookEx, keyboard_callback)
def listen(callback):
prepare_intercept(callback)
msg = LPMSG()
while not GetMessage(msg, 0, 0, 0):
TranslateMessage(msg)
DispatchMessage(msg)
def map_name(name):
_setup_name_tables()
entries = from_name.get(name)
if not entries:
raise ValueError('Key name {} is not mapped to any known key.'.format(repr(name)))
for i, entry in entries:
scan_code, vk, is_extended, modifiers = entry
yield scan_code or -vk, modifiers
def _send_event(code, event_type):
if code == 541:
# Alt-gr is made of ctrl+alt. Just sending even 541 doesn't do anything.
user32.keybd_event(0x11, code, event_type, 0)
user32.keybd_event(0x12, code, event_type, 0)
elif code > 0:
vk = scan_code_to_vk.get(code, 0)
user32.keybd_event(vk, code, event_type, 0)
else:
# Negative scan code is a way to indicate we don't have a scan code,
# and the value actually contains the Virtual key code.
user32.keybd_event(-code, 0, event_type, 0)
def press(code):
_send_event(code, 0)
def release(code):
_send_event(code, 2)
def type_unicode(character):
# This code and related structures are based on
# http://stackoverflow.com/a/11910555/252218
surrogates = bytearray(character.encode('utf-16le'))
presses = []
releases = []
for i in range(0, len(surrogates), 2):
higher, lower = surrogates[i:i+2]
structure = KEYBDINPUT(0, (lower << 8) + higher, KEYEVENTF_UNICODE, 0, None)
presses.append(INPUT(INPUT_KEYBOARD, _INPUTunion(ki=structure)))
structure = KEYBDINPUT(0, (lower << 8) + higher, KEYEVENTF_UNICODE | KEYEVENTF_KEYUP, 0, None)
releases.append(INPUT(INPUT_KEYBOARD, _INPUTunion(ki=structure)))
inputs = presses + releases
nInputs = len(inputs)
LPINPUT = INPUT * nInputs
pInputs = LPINPUT(*inputs)
cbSize = c_int(ctypes.sizeof(INPUT))
SendInput(nInputs, pInputs, cbSize)
if __name__ == '__main__':
_setup_name_tables()
import pprint
pprint.pprint(to_name)
pprint.pprint(from_name)
#listen(lambda e: print(e.to_json()) or True)

@ -0,0 +1,201 @@
# -*- coding: utf-8 -*-
import ctypes
import time
from ctypes import c_short, c_char, c_uint8, c_int32, c_int, c_uint, c_uint32, c_long, byref, Structure, CFUNCTYPE, POINTER
from ctypes.wintypes import DWORD, BOOL, HHOOK, MSG, LPWSTR, WCHAR, WPARAM, LPARAM
LPMSG = POINTER(MSG)
import atexit
from ._mouse_event import ButtonEvent, WheelEvent, MoveEvent, LEFT, RIGHT, MIDDLE, X, X2, UP, DOWN, DOUBLE, WHEEL, HORIZONTAL, VERTICAL
#https://github.com/boppreh/mouse/issues/1
#user32 = ctypes.windll.user32
user32 = ctypes.WinDLL('user32', use_last_error = True)
class MSLLHOOKSTRUCT(Structure):
_fields_ = [("x", c_long),
("y", c_long),
('data', c_int32),
('reserved', c_int32),
("flags", DWORD),
("time", c_int),
]
LowLevelMouseProc = CFUNCTYPE(c_int, WPARAM, LPARAM, POINTER(MSLLHOOKSTRUCT))
SetWindowsHookEx = user32.SetWindowsHookExA
#SetWindowsHookEx.argtypes = [c_int, LowLevelMouseProc, c_int, c_int]
SetWindowsHookEx.restype = HHOOK
CallNextHookEx = user32.CallNextHookEx
#CallNextHookEx.argtypes = [c_int , c_int, c_int, POINTER(MSLLHOOKSTRUCT)]
CallNextHookEx.restype = c_int
UnhookWindowsHookEx = user32.UnhookWindowsHookEx
UnhookWindowsHookEx.argtypes = [HHOOK]
UnhookWindowsHookEx.restype = BOOL
GetMessage = user32.GetMessageW
GetMessage.argtypes = [LPMSG, c_int, c_int, c_int]
GetMessage.restype = BOOL
TranslateMessage = user32.TranslateMessage
TranslateMessage.argtypes = [LPMSG]
TranslateMessage.restype = BOOL
DispatchMessage = user32.DispatchMessageA
DispatchMessage.argtypes = [LPMSG]
# Beware, as of 2016-01-30 the official docs have a very incomplete list.
# This one was compiled from experience and may be incomplete.
WM_MOUSEMOVE = 0x200
WM_LBUTTONDOWN = 0x201
WM_LBUTTONUP = 0x202
WM_LBUTTONDBLCLK = 0x203
WM_RBUTTONDOWN = 0x204
WM_RBUTTONUP = 0x205
WM_RBUTTONDBLCLK = 0x206
WM_MBUTTONDOWN = 0x207
WM_MBUTTONUP = 0x208
WM_MBUTTONDBLCLK = 0x209
WM_MOUSEWHEEL = 0x20A
WM_XBUTTONDOWN = 0x20B
WM_XBUTTONUP = 0x20C
WM_XBUTTONDBLCLK = 0x20D
WM_NCXBUTTONDOWN = 0x00AB
WM_NCXBUTTONUP = 0x00AC
WM_NCXBUTTONDBLCLK = 0x00AD
WM_MOUSEHWHEEL = 0x20E
WM_LBUTTONDOWN = 0x0201
WM_LBUTTONUP = 0x0202
WM_MOUSEMOVE = 0x0200
WM_MOUSEWHEEL = 0x020A
WM_MOUSEHWHEEL = 0x020E
WM_RBUTTONDOWN = 0x0204
WM_RBUTTONUP = 0x0205
buttons_by_wm_code = {
WM_LBUTTONDOWN: (DOWN, LEFT),
WM_LBUTTONUP: (UP, LEFT),
WM_LBUTTONDBLCLK: (DOUBLE, LEFT),
WM_RBUTTONDOWN: (DOWN, RIGHT),
WM_RBUTTONUP: (UP, RIGHT),
WM_RBUTTONDBLCLK: (DOUBLE, RIGHT),
WM_MBUTTONDOWN: (DOWN, MIDDLE),
WM_MBUTTONUP: (UP, MIDDLE),
WM_MBUTTONDBLCLK: (DOUBLE, MIDDLE),
WM_XBUTTONDOWN: (DOWN, X),
WM_XBUTTONUP: (UP, X),
WM_XBUTTONDBLCLK: (DOUBLE, X),
}
MOUSEEVENTF_ABSOLUTE = 0x8000
MOUSEEVENTF_MOVE = 0x1
MOUSEEVENTF_WHEEL = 0x800
MOUSEEVENTF_HWHEEL = 0x1000
MOUSEEVENTF_LEFTDOWN = 0x2
MOUSEEVENTF_LEFTUP = 0x4
MOUSEEVENTF_RIGHTDOWN = 0x8
MOUSEEVENTF_RIGHTUP = 0x10
MOUSEEVENTF_MIDDLEDOWN = 0x20
MOUSEEVENTF_MIDDLEUP = 0x40
MOUSEEVENTF_XDOWN = 0x0080
MOUSEEVENTF_XUP = 0x0100
simulated_mouse_codes = {
(WHEEL, HORIZONTAL): MOUSEEVENTF_HWHEEL,
(WHEEL, VERTICAL): MOUSEEVENTF_WHEEL,
(DOWN, LEFT): MOUSEEVENTF_LEFTDOWN,
(UP, LEFT): MOUSEEVENTF_LEFTUP,
(DOWN, RIGHT): MOUSEEVENTF_RIGHTDOWN,
(UP, RIGHT): MOUSEEVENTF_RIGHTUP,
(DOWN, MIDDLE): MOUSEEVENTF_MIDDLEDOWN,
(UP, MIDDLE): MOUSEEVENTF_MIDDLEUP,
(DOWN, X): MOUSEEVENTF_XDOWN,
(UP, X): MOUSEEVENTF_XUP,
}
NULL = c_int(0)
WHEEL_DELTA = 120
init = lambda: None
def listen(queue):
def low_level_mouse_handler(nCode, wParam, lParam):
struct = lParam.contents
# Can't use struct.time because it's usually zero.
t = time.time()
if wParam == WM_MOUSEMOVE:
event = MoveEvent(struct.x, struct.y, t)
elif wParam == WM_MOUSEWHEEL:
event = WheelEvent(struct.data / (WHEEL_DELTA * (2<<15)), t)
elif wParam in buttons_by_wm_code:
type, button = buttons_by_wm_code.get(wParam, ('?', '?'))
if wParam >= WM_XBUTTONDOWN:
button = {0x10000: X, 0x20000: X2}[struct.data]
event = ButtonEvent(type, button, t)
queue.put(event)
return CallNextHookEx(NULL, nCode, wParam, lParam)
WH_MOUSE_LL = c_int(14)
mouse_callback = LowLevelMouseProc(low_level_mouse_handler)
mouse_hook = SetWindowsHookEx(WH_MOUSE_LL, mouse_callback, NULL, NULL)
# Register to remove the hook when the interpreter exits. Unfortunately a
# try/finally block doesn't seem to work here.
atexit.register(UnhookWindowsHookEx, mouse_hook)
msg = LPMSG()
while not GetMessage(msg, NULL, NULL, NULL):
TranslateMessage(msg)
DispatchMessage(msg)
def _translate_button(button):
if button == X or button == X2:
return X, {X: 0x10000, X2: 0x20000}[button]
else:
return button, 0
def press(button=LEFT):
button, data = _translate_button(button)
code = simulated_mouse_codes[(DOWN, button)]
user32.mouse_event(code, 0, 0, data, 0)
def release(button=LEFT):
button, data = _translate_button(button)
code = simulated_mouse_codes[(UP, button)]
user32.mouse_event(code, 0, 0, data, 0)
def wheel(delta=1):
code = simulated_mouse_codes[(WHEEL, VERTICAL)]
user32.mouse_event(code, 0, 0, int(delta * WHEEL_DELTA), 0)
def move_to(x, y):
user32.SetCursorPos(int(x), int(y))
def move_relative(x, y):
user32.mouse_event(MOUSEEVENTF_MOVE, int(x), int(y), 0, 0)
class POINT(Structure):
_fields_ = [("x", c_long), ("y", c_long)]
def get_position():
point = POINT()
user32.GetCursorPos(byref(point))
return (point.x, point.y)
if __name__ == '__main__':
def p(e):
print(e)
listen(p)

@ -0,0 +1,119 @@
from collections import defaultdict
from Xlib import X, XK, display
from Xlib.ext import record
from Xlib.protocol import rq
from ._keyboard_event import KeyboardEvent, KEY_DOWN, KEY_UP, normalize_name
# from ._nixkeyboard import init
def cleanup_key(name):
if name.startswith('XK_'):
name = name[3:]
if name.startswith('KP_'):
is_keypad = True
name = name[3:]
else:
is_keypad = False
if name.endswith('_R'):
name = 'right ' + name[:-2]
if name.endswith('_L'):
name = 'left ' + name[:-2]
return normalize_name(name), is_keypad
keysym_to_keys = defaultdict(list)
name_to_keysyms = defaultdict(list)
for raw_name in dir(XK):
if not raw_name.startswith('XK_'): continue
keysym = getattr(XK, raw_name)
name, is_keypad = cleanup_key(raw_name)
keysym_to_keys[keysym].append((name, is_keypad))
name_to_keysyms[name].append(keysym)
local_dpy = None
record_dpy = None
ctx = None
def init():
# Adapted from https://github.com/python-xlib/python-xlib/blob/master/examples/record_demo.py
global local_dpy
global record_dpy
local_dpy = display.Display()
record_dpy = display.Display()
if not record_dpy.has_extension("RECORD"):
raise ImportError("RECORD extension not found")
r = record_dpy.record_get_version(0, 0)
# print("RECORD extension version %d.%d" % (r.major_version, r.minor_version))
# Create a recording context; we only want key and mouse events
global ctx
ctx = record_dpy.record_create_context(
0,
[record.AllClients],
[
{
'core_requests': (0, 0),
'core_replies': (0, 0),
'ext_requests': (0, 0, 0, 0),
'ext_replies': (0, 0, 0, 0),
'delivered_events': (0, 0),
'device_events': (X.KeyPress, X.KeyPress),
'errors': (0, 0),
'client_started': False,
'client_died': False,
}
]
)
def listen(callback):
def handler(reply):
if reply.category != record.FromServer:
return
if reply.client_swapped:
print("* received swapped protocol data, cowardly ignored")
return
if not len(reply.data) or reply.data[0] < 2:
# not an event
return
data = reply.data
while len(data):
raw_event, data = rq.EventField(None).parse_binary_value(data, record_dpy.display, None, None)
event_type = {X.KeyPress: KEY_DOWN, X.KeyRelease: KEY_UP}.get(raw_event.type)
if event_type:
keysym = local_dpy.keycode_to_keysym(raw_event.detail, 0)
# TODO: scan code is not correct.
if not keysym:
event = KeyboardEvent(event_type=event_type, scan_code=raw_event.detail)
else:
try:
name, is_keypad = keysym_to_keys[keysym][0]
except IndexError:
name, is_keypad = None, None
event = KeyboardEvent(event_type=event_type, scan_code=keysym, name=name, is_keypad=is_keypad)
callback(event)
try:
# Enable the context; this only returns after a call to record_disable_context,
# while calling the callback function in the meantime
record_dpy.record_enable_context(ctx, handler)
finally:
# Finally free the context
record_dpy.record_free_context(ctx)
def map_name(name):
for keysym in name_to_keysyms[name]:
yield keysym, ()

@ -0,0 +1,232 @@
# -*- coding: utf-8 -*-
import warnings
warnings.simplefilter('always', DeprecationWarning)
warnings.warn('The mouse sub-library is deprecated and will be removed in future versions. Please use the standalone package `mouse`.', DeprecationWarning, stacklevel=2)
import time as _time
import platform as _platform
if _platform.system() == 'Windows':
from. import _winmouse as _os_mouse
elif _platform.system() == 'Linux':
from. import _nixmouse as _os_mouse
elif _platform.system() == 'Darwin':
from. import _darwinmouse as _os_mouse
else:
raise OSError("Unsupported platform '{}'".format(_platform.system()))
from ._mouse_event import ButtonEvent, MoveEvent, WheelEvent, LEFT, RIGHT, MIDDLE, X, X2, UP, DOWN, DOUBLE
from ._generic import GenericListener as _GenericListener
_pressed_events = set()
class _MouseListener(_GenericListener):
def init(self):
_os_mouse.init()
def pre_process_event(self, event):
if isinstance(event, ButtonEvent):
if event.event_type in (UP, DOUBLE):
_pressed_events.discard(event.button)
else:
_pressed_events.add(event.button)
return True
def listen(self):
_os_mouse.listen(self.queue)
_listener = _MouseListener()
def is_pressed(button=LEFT):
""" Returns True if the given button is currently pressed. """
_listener.start_if_necessary()
return button in _pressed_events
def press(button=LEFT):
""" Presses the given button (but doesn't release). """
_os_mouse.press(button)
def release(button=LEFT):
""" Releases the given button. """
_os_mouse.release(button)
def click(button=LEFT):
""" Sends a click with the given button. """
_os_mouse.press(button)
_os_mouse.release(button)
def double_click(button=LEFT):
""" Sends a double click with the given button. """
click(button)
click(button)
def right_click():
""" Sends a right click with the given button. """
click(RIGHT)
def wheel(delta=1):
""" Scrolls the wheel `delta` clicks. Sign indicates direction. """
_os_mouse.wheel(delta)
def move(x, y, absolute=True, duration=0):
"""
Moves the mouse. If `absolute`, to position (x, y), otherwise move relative
to the current position. If `duration` is non-zero, animates the movement.
"""
x = int(x)
y = int(y)
# Requires an extra system call on Linux, but `move_relative` is measured
# in millimiters so we would lose precision.
position_x, position_y = get_position()
if not absolute:
x = position_x + x
y = position_y + y
if duration:
start_x = position_x
start_y = position_y
dx = x - start_x
dy = y - start_y
if dx == 0 and dy == 0:
_time.sleep(duration)
else:
# 120 movements per second.
# Round and keep float to ensure float division in Python 2
steps = max(1.0, float(int(duration * 120.0)))
for i in range(int(steps)+1):
move(start_x + dx*i/steps, start_y + dy*i/steps)
_time.sleep(duration/steps)
else:
_os_mouse.move_to(x, y)
def drag(start_x, start_y, end_x, end_y, absolute=True, duration=0):
"""
Holds the left mouse button, moving from start to end position, then
releases. `absolute` and `duration` are parameters regarding the mouse
movement.
"""
if is_pressed():
release()
move(start_x, start_y, absolute, 0)
press()
move(end_x, end_y, absolute, duration)
release()
def on_button(callback, args=(), buttons=(LEFT, MIDDLE, RIGHT, X, X2), types=(UP, DOWN, DOUBLE)):
""" Invokes `callback` with `args` when the specified event happens. """
if not isinstance(buttons, (tuple, list)):
buttons = (buttons,)
if not isinstance(types, (tuple, list)):
types = (types,)
def handler(event):
if isinstance(event, ButtonEvent):
if event.event_type in types and event.button in buttons:
callback(*args)
_listener.add_handler(handler)
return handler
def on_click(callback, args=()):
""" Invokes `callback` with `args` when the left button is clicked. """
return on_button(callback, args, [LEFT], [UP])
def on_double_click(callback, args=()):
"""
Invokes `callback` with `args` when the left button is double clicked.
"""
return on_button(callback, args, [LEFT], [DOUBLE])
def on_right_click(callback, args=()):
""" Invokes `callback` with `args` when the right button is clicked. """
return on_button(callback, args, [RIGHT], [UP])
def on_middle_click(callback, args=()):
""" Invokes `callback` with `args` when the middle button is clicked. """
return on_button(callback, args, [MIDDLE], [UP])
def wait(button=LEFT, target_types=(UP, DOWN, DOUBLE)):
"""
Blocks program execution until the given button performs an event.
"""
from threading import Lock
lock = Lock()
lock.acquire()
handler = on_button(lock.release, (), [button], target_types)
lock.acquire()
_listener.remove_handler(handler)
def get_position():
""" Returns the (x, y) mouse position. """
return _os_mouse.get_position()
def hook(callback):
"""
Installs a global listener on all available mouses, invoking `callback`
each time it is moved, a key status changes or the wheel is spun. A mouse
event is passed as argument, with type either `mouse.ButtonEvent`,
`mouse.WheelEvent` or `mouse.MoveEvent`.
Returns the given callback for easier development.
"""
_listener.add_handler(callback)
return callback
def unhook(callback):
"""
Removes a previously installed hook.
"""
_listener.remove_handler(callback)
def unhook_all():
"""
Removes all hooks registered by this application. Note this may include
hooks installed by high level functions, such as `record`.
"""
del _listener.handlers[:]
def record(button=RIGHT, target_types=(DOWN,)):
"""
Records all mouse events until the user presses the given button.
Then returns the list of events recorded. Pairs well with `play(events)`.
Note: this is a blocking function.
Note: for more details on the mouse hook and events see `hook`.
"""
recorded = []
hook(recorded.append)
wait(button=button, target_types=target_types)
unhook(recorded.append)
return recorded
def play(events, speed_factor=1.0, include_clicks=True, include_moves=True, include_wheel=True):
"""
Plays a sequence of recorded events, maintaining the relative time
intervals. If speed_factor is <= 0 then the actions are replayed as fast
as the OS allows. Pairs well with `record()`.
The parameters `include_*` define if events of that type should be inluded
in the replay or ignored.
"""
last_time = None
for event in events:
if speed_factor > 0 and last_time is not None:
_time.sleep((event.time - last_time) / speed_factor)
last_time = event.time
if isinstance(event, ButtonEvent) and include_clicks:
if event.event_type == UP:
_os_mouse.release(event.button)
else:
_os_mouse.press(event.button)
elif isinstance(event, MoveEvent) and include_moves:
_os_mouse.move_to(event.x, event.y)
elif isinstance(event, WheelEvent) and include_wheel:
_os_mouse.wheel(event.delta)
replay = play
hold = press
if __name__ == '__main__':
print('Recording... Double click to stop and replay.')
play(record())
Loading…
Cancel
Save