import sys , types
from . lock import allocate_lock
from . error import CDefError
from . import model
try :
callable
except NameError :
# Python 3.1
from collections import Callable
callable = lambda x : isinstance ( x , Callable )
try :
basestring
except NameError :
# Python 3.x
basestring = str
_unspecified = object ( )
class FFI ( object ) :
r '''
The main top - level class that you instantiate once , or once per module .
Example usage :
ffi = FFI ( )
ffi . cdef ( """
int printf ( const char * , . . . ) ;
""" )
C = ffi . dlopen ( None ) # standard library
- or -
C = ffi . verify ( ) # use a C compiler: verify the decl above is right
C . printf ( " hello, %s ! \n " , ffi . new ( " char[] " , " world " ) )
'''
def __init__ ( self , backend = None ) :
""" Create an FFI instance. The ' backend ' argument is used to
select a non - default backend , mostly for tests .
"""
if backend is None :
# You need PyPy (>= 2.0 beta), or a CPython (>= 2.6) with
# _cffi_backend.so compiled.
import _cffi_backend as backend
from . import __version__
if backend . __version__ != __version__ :
# bad version! Try to be as explicit as possible.
if hasattr ( backend , ' __file__ ' ) :
# CPython
raise Exception ( " Version mismatch: this is the ' cffi ' package version %s , located in %r . When we import the top-level ' _cffi_backend ' extension module, we get version %s , located in %r . The two versions should be equal; check your installation. " % (
__version__ , __file__ ,
backend . __version__ , backend . __file__ ) )
else :
# PyPy
raise Exception ( " Version mismatch: this is the ' cffi ' package version %s , located in %r . This interpreter comes with a built-in ' _cffi_backend ' module, which is version %s . The two versions should be equal; check your installation. " % (
__version__ , __file__ , backend . __version__ ) )
# (If you insist you can also try to pass the option
# 'backend=backend_ctypes.CTypesBackend()', but don't
# rely on it! It's probably not going to work well.)
from . import cparser
self . _backend = backend
self . _lock = allocate_lock ( )
self . _parser = cparser . Parser ( )
self . _cached_btypes = { }
self . _parsed_types = types . ModuleType ( ' parsed_types ' ) . __dict__
self . _new_types = types . ModuleType ( ' new_types ' ) . __dict__
self . _function_caches = [ ]
self . _libraries = [ ]
self . _cdefsources = [ ]
self . _included_ffis = [ ]
self . _windows_unicode = None
self . _init_once_cache = { }
self . _cdef_version = None
self . _embedding = None
self . _typecache = model . get_typecache ( backend )
if hasattr ( backend , ' set_ffi ' ) :
backend . set_ffi ( self )
for name in list ( backend . __dict__ ) :
if name . startswith ( ' RTLD_ ' ) :
setattr ( self , name , getattr ( backend , name ) )
#
with self . _lock :
self . BVoidP = self . _get_cached_btype ( model . voidp_type )
self . BCharA = self . _get_cached_btype ( model . char_array_type )
if isinstance ( backend , types . ModuleType ) :
# _cffi_backend: attach these constants to the class
if not hasattr ( FFI , ' NULL ' ) :
FFI . NULL = self . cast ( self . BVoidP , 0 )
FFI . CData , FFI . CType = backend . _get_types ( )
else :
# ctypes backend: attach these constants to the instance
self . NULL = self . cast ( self . BVoidP , 0 )
self . CData , self . CType = backend . _get_types ( )
self . buffer = backend . buffer
def cdef ( self , csource , override = False , packed = False , pack = None ) :
""" Parse the given C source. This registers all declared functions,
types , and global variables . The functions and global variables can
then be accessed via either ' ffi.dlopen() ' or ' ffi.verify() ' .
The types can be used in ' ffi.new() ' and other functions .
If ' packed ' is specified as True , all structs declared inside this
cdef are packed , i . e . laid out without any field alignment at all .
Alternatively , ' pack ' can be a small integer , and requests for
alignment greater than that are ignored ( pack = 1 is equivalent to
packed = True ) .
"""
self . _cdef ( csource , override = override , packed = packed , pack = pack )
def embedding_api ( self , csource , packed = False , pack = None ) :
self . _cdef ( csource , packed = packed , pack = pack , dllexport = True )
if self . _embedding is None :
self . _embedding = ' '
def _cdef ( self , csource , override = False , * * options ) :
if not isinstance ( csource , str ) : # unicode, on Python 2
if not isinstance ( csource , basestring ) :
raise TypeError ( " cdef() argument must be a string " )
csource = csource . encode ( ' ascii ' )
with self . _lock :
self . _cdef_version = object ( )
self . _parser . parse ( csource , override = override , * * options )
self . _cdefsources . append ( csource )
if override :
for cache in self . _function_caches :
cache . clear ( )
finishlist = self . _parser . _recomplete
if finishlist :
self . _parser . _recomplete = [ ]
for tp in finishlist :
tp . finish_backend_type ( self , finishlist )
def dlopen ( self , name , flags = 0 ) :
""" Load and return a dynamic library identified by ' name ' .
The standard C library can be loaded by passing None .
Note that functions and types declared by ' ffi.cdef() ' are not
linked to a particular library , just like C headers ; in the
library we only look for the actual ( untyped ) symbols .
"""
if not ( isinstance ( name , basestring ) or
name is None or
isinstance ( name , self . CData ) ) :
raise TypeError ( " dlopen(name): name must be a file name, None, "
" or an already-opened ' void * ' handle " )
with self . _lock :
lib , function_cache = _make_ffi_library ( self , name , flags )
self . _function_caches . append ( function_cache )
self . _libraries . append ( lib )
return lib
def dlclose ( self , lib ) :
""" Close a library obtained with ffi.dlopen(). After this call,
access to functions or variables from the library will fail
( possibly with a segmentation fault ) .
"""
type ( lib ) . __cffi_close__ ( lib )
def _typeof_locked ( self , cdecl ) :
# call me with the lock!
key = cdecl
if key in self . _parsed_types :
return self . _parsed_types [ key ]
#
if not isinstance ( cdecl , str ) : # unicode, on Python 2
cdecl = cdecl . encode ( ' ascii ' )
#
type = self . _parser . parse_type ( cdecl )
really_a_function_type = type . is_raw_function
if really_a_function_type :
type = type . as_function_pointer ( )
btype = self . _get_cached_btype ( type )
result = btype , really_a_function_type
self . _parsed_types [ key ] = result
return result
def _typeof ( self , cdecl , consider_function_as_funcptr = False ) :
# string -> ctype object
try :
result = self . _parsed_types [ cdecl ]
except KeyError :
with self . _lock :
result = self . _typeof_locked ( cdecl )
#
btype , really_a_function_type = result
if really_a_function_type and not consider_function_as_funcptr :
raise CDefError ( " the type %r is a function type, not a "
" pointer-to-function type " % ( cdecl , ) )
return btype
def typeof ( self , cdecl ) :
""" Parse the C type given as a string and return the
corresponding < ctype > object .
It can also be used on ' cdata ' instance to get its C type .
"""
if isinstance ( cdecl , basestring ) :
return self . _typeof ( cdecl )
if isinstance ( cdecl , self . CData ) :
return self . _backend . typeof ( cdecl )
if isinstance ( cdecl , types . BuiltinFunctionType ) :
res = _builtin_function_type ( cdecl )
if res is not None :
return res
if ( isinstance ( cdecl , types . FunctionType )
and hasattr ( cdecl , ' _cffi_base_type ' ) ) :
with self . _lock :
return self . _get_cached_btype ( cdecl . _cffi_base_type )
raise TypeError ( type ( cdecl ) )
def sizeof ( self , cdecl ) :
""" Return the size in bytes of the argument. It can be a
string naming a C type , or a ' cdata ' instance .
"""
if isinstance ( cdecl , basestring ) :
BType = self . _typeof ( cdecl )
return self . _backend . sizeof ( BType )
else :
return self . _backend . sizeof ( cdecl )
def alignof ( self , cdecl ) :
""" Return the natural alignment size in bytes of the C type
given as a string .
"""
if isinstance ( cdecl , basestring ) :
cdecl = self . _typeof ( cdecl )
return self . _backend . alignof ( cdecl )
def offsetof ( self , cdecl , * fields_or_indexes ) :
""" Return the offset of the named field inside the given
structure or array , which must be given as a C type name .
You can give several field names in case of nested structures .
You can also give numeric values which correspond to array
items , in case of an array type .
"""
if isinstance ( cdecl , basestring ) :
cdecl = self . _typeof ( cdecl )
return self . _typeoffsetof ( cdecl , * fields_or_indexes ) [ 1 ]
def new ( self , cdecl , init = None ) :
""" Allocate an instance according to the specified C type and
return a pointer to it . The specified C type must be either a
pointer or an array : ` ` new ( ' X * ' ) ` ` allocates an X and returns
a pointer to it , whereas ` ` new ( ' X[n] ' ) ` ` allocates an array of
n X ' es and returns an array referencing it (which works
mostly like a pointer , like in C ) . You can also use
` ` new ( ' X[] ' , n ) ` ` to allocate an array of a non - constant
length n .
The memory is initialized following the rules of declaring a
global variable in C : by default it is zero - initialized , but
an explicit initializer can be given which can be used to
fill all or part of the memory .
When the returned < cdata > object goes out of scope , the memory
is freed . In other words the returned < cdata > object has
ownership of the value of type ' cdecl ' that it points to . This
means that the raw data can be used as long as this object is
kept alive , but must not be used for a longer time . Be careful
about that when copying the pointer to the memory somewhere
else , e . g . into another structure .
"""
if isinstance ( cdecl , basestring ) :
cdecl = self . _typeof ( cdecl )
return self . _backend . newp ( cdecl , init )
def new_allocator ( self , alloc = None , free = None ,
should_clear_after_alloc = True ) :
""" Return a new allocator, i.e. a function that behaves like ffi.new()
but uses the provided low - level ' alloc ' and ' free ' functions .
' alloc ' is called with the size as argument . If it returns NULL , a
MemoryError is raised . ' free ' is called with the result of ' alloc '
as argument . Both can be either Python function or directly C
functions . If ' free ' is None , then no free function is called .
If both ' alloc ' and ' free ' are None , the default is used .
If ' should_clear_after_alloc ' is set to False , then the memory
returned by ' alloc ' is assumed to be already cleared ( or you are
fine with garbage ) ; otherwise CFFI will clear it .
"""
compiled_ffi = self . _backend . FFI ( )
allocator = compiled_ffi . new_allocator ( alloc , free ,
should_clear_after_alloc )
def allocate ( cdecl , init = None ) :
if isinstance ( cdecl , basestring ) :
cdecl = self . _typeof ( cdecl )
return allocator ( cdecl , init )
return allocate
def cast ( self , cdecl , source ) :
""" Similar to a C cast: returns an instance of the named C
type initialized with the given ' source ' . The source is
casted between integers or pointers of any type .
"""
if isinstance ( cdecl , basestring ) :
cdecl = self . _typeof ( cdecl )
return self . _backend . cast ( cdecl , source )
def string ( self , cdata , maxlen = - 1 ) :
""" Return a Python string (or unicode string) from the ' cdata ' .
If ' cdata ' is a pointer or array of characters or bytes , returns
the null - terminated string . The returned string extends until
the first null character , or at most ' maxlen ' characters . If
' cdata ' is an array then ' maxlen ' defaults to its length .
If ' cdata ' is a pointer or array of wchar_t , returns a unicode
string following the same rules .
If ' cdata ' is a single character or byte or a wchar_t , returns
it as a string or unicode string .
If ' cdata ' is an enum , returns the value of the enumerator as a
string , or ' NUMBER ' if the value is out of range .
"""
return self . _backend . string ( cdata , maxlen )
def unpack ( self , cdata , length ) :
""" Unpack an array of C data of the given length,
returning a Python string / unicode / list .
If ' cdata ' is a pointer to ' char ' , returns a byte string .
It does not stop at the first null . This is equivalent to :
ffi . buffer ( cdata , length ) [ : ]
If ' cdata ' is a pointer to ' wchar_t ' , returns a unicode string .
' length ' is measured in wchar_t ' s; it is not the size in bytes.
If ' cdata ' is a pointer to anything else , returns a list of
' length ' items . This is a faster equivalent to :
[ cdata [ i ] for i in range ( length ) ]
"""
return self . _backend . unpack ( cdata , length )
#def buffer(self, cdata, size=-1):
# """Return a read-write buffer object that references the raw C data
# pointed to by the given 'cdata'. The 'cdata' must be a pointer or
# an array. Can be passed to functions expecting a buffer, or directly
# manipulated with:
#
# buf[:] get a copy of it in a regular string, or
# buf[idx] as a single character
# buf[:] = ...
# buf[idx] = ... change the content
# """
# note that 'buffer' is a type, set on this instance by __init__
def from_buffer ( self , cdecl , python_buffer = _unspecified ,
require_writable = False ) :
""" Return a cdata of the given type pointing to the data of the
given Python object , which must support the buffer interface .
Note that this is not meant to be used on the built - in types
str or unicode ( you can build ' char[] ' arrays explicitly )
but only on objects containing large quantities of raw data
in some other format , like ' array.array ' or numpy arrays .
The first argument is optional and default to ' char[] ' .
"""
if python_buffer is _unspecified :
cdecl , python_buffer = self . BCharA , cdecl
elif isinstance ( cdecl , basestring ) :
cdecl = self . _typeof ( cdecl )
return self . _backend . from_buffer ( cdecl , python_buffer ,
require_writable )
def memmove ( self , dest , src , n ) :
""" ffi.memmove(dest, src, n) copies n bytes of memory from src to dest.
Like the C function memmove ( ) , the memory areas may overlap ;
apart from that it behaves like the C function memcpy ( ) .
' src ' can be any cdata ptr or array , or any Python buffer object .
' dest ' can be any cdata ptr or array , or a writable Python buffer
object . The size to copy , ' n ' , is always measured in bytes .
Unlike other methods , this one supports all Python buffer including
byte strings and bytearrays - - - but it still does not support
non - contiguous buffers .
"""
return self . _backend . memmove ( dest , src , n )
def callback ( self , cdecl , python_callable = None , error = None , onerror = None ) :
""" Return a callback object or a decorator making such a
callback object . ' cdecl ' must name a C function pointer type .
The callback invokes the specified ' python_callable ' ( which may
be provided either directly or via a decorator ) . Important : the
callback object must be manually kept alive for as long as the
callback may be invoked from the C level .
"""
def callback_decorator_wrap ( python_callable ) :
if not callable ( python_callable ) :
raise TypeError ( " the ' python_callable ' argument "
" is not callable " )
return self . _backend . callback ( cdecl , python_callable ,
error , onerror )
if isinstance ( cdecl , basestring ) :
cdecl = self . _typeof ( cdecl , consider_function_as_funcptr = True )
if python_callable is None :
return callback_decorator_wrap # decorator mode
else :
return callback_decorator_wrap ( python_callable ) # direct mode
def getctype ( self , cdecl , replace_with = ' ' ) :
""" Return a string giving the C type ' cdecl ' , which may be itself
a string or a < ctype > object . If ' replace_with ' is given , it gives
extra text to append ( or insert for more complicated C types ) , like
a variable name , or ' * ' to get actually the C type ' pointer-to-cdecl ' .
"""
if isinstance ( cdecl , basestring ) :
cdecl = self . _typeof ( cdecl )
replace_with = replace_with . strip ( )
if ( replace_with . startswith ( ' * ' )
and ' &[ ' in self . _backend . getcname ( cdecl , ' & ' ) ) :
replace_with = ' ( %s ) ' % replace_with
elif replace_with and not replace_with [ 0 ] in ' [( ' :
replace_with = ' ' + replace_with
return self . _backend . getcname ( cdecl , replace_with )
def gc ( self , cdata , destructor , size = 0 ) :
""" Return a new cdata object that points to the same
data . Later , when this new cdata object is garbage - collected ,
' destructor(old_cdata_object) ' will be called .
The optional ' size ' gives an estimate of the size , used to
trigger the garbage collection more eagerly . So far only used
on PyPy . It tells the GC that the returned object keeps alive
roughly ' size ' bytes of external memory .
"""
return self . _backend . gcp ( cdata , destructor , size )
def _get_cached_btype ( self , type ) :
assert self . _lock . acquire ( False ) is False
# call me with the lock!
try :
BType = self . _cached_btypes [ type ]
except KeyError :
finishlist = [ ]
BType = type . get_cached_btype ( self , finishlist )
for type in finishlist :
type . finish_backend_type ( self , finishlist )
return BType
def verify ( self , source = ' ' , tmpdir = None , * * kwargs ) :
""" Verify that the current ffi signatures compile on this
machine , and return a dynamic library object . The dynamic
library can be used to call functions and access global
variables declared in this ' ffi ' . The library is compiled
by the C compiler : it gives you C - level API compatibility
( including calling macros ) . This is unlike ' ffi.dlopen() ' ,
which requires binary compatibility in the signatures .
"""
from . verifier import Verifier , _caller_dir_pycache
#
# If set_unicode(True) was called, insert the UNICODE and
# _UNICODE macro declarations
if self . _windows_unicode :
self . _apply_windows_unicode ( kwargs )
#
# Set the tmpdir here, and not in Verifier.__init__: it picks
# up the caller's directory, which we want to be the caller of
# ffi.verify(), as opposed to the caller of Veritier().
tmpdir = tmpdir or _caller_dir_pycache ( )
#
# Make a Verifier() and use it to load the library.
self . verifier = Verifier ( self , source , tmpdir , * * kwargs )
lib = self . verifier . load_library ( )
#
# Save the loaded library for keep-alive purposes, even
# if the caller doesn't keep it alive itself (it should).
self . _libraries . append ( lib )
return lib
def _get_errno ( self ) :
return self . _backend . get_errno ( )
def _set_errno ( self , errno ) :
self . _backend . set_errno ( errno )
errno = property ( _get_errno , _set_errno , None ,
" the value of ' errno ' from/to the C calls " )
def getwinerror ( self , code = - 1 ) :
return self . _backend . getwinerror ( code )
def _pointer_to ( self , ctype ) :
with self . _lock :
return model . pointer_cache ( self , ctype )
def addressof ( self , cdata , * fields_or_indexes ) :
""" Return the address of a <cdata ' struct-or-union ' >.
If ' fields_or_indexes ' are given , returns the address of that
field or array item in the structure or array , recursively in
case of nested structures .
"""
try :
ctype = self . _backend . typeof ( cdata )
except TypeError :
if ' __addressof__ ' in type ( cdata ) . __dict__ :
return type ( cdata ) . __addressof__ ( cdata , * fields_or_indexes )
raise
if fields_or_indexes :
ctype , offset = self . _typeoffsetof ( ctype , * fields_or_indexes )
else :
if ctype . kind == " pointer " :
raise TypeError ( " addressof(pointer) " )
offset = 0
ctypeptr = self . _pointer_to ( ctype )
return self . _backend . rawaddressof ( ctypeptr , cdata , offset )
def _typeoffsetof ( self , ctype , field_or_index , * fields_or_indexes ) :
ctype , offset = self . _backend . typeoffsetof ( ctype , field_or_index )
for field1 in fields_or_indexes :
ctype , offset1 = self . _backend . typeoffsetof ( ctype , field1 , 1 )
offset + = offset1
return ctype , offset
def include ( self , ffi_to_include ) :
""" Includes the typedefs, structs, unions and enums defined
in another FFI instance . Usage is similar to a #include in C,
where a part of the program might include types defined in
another part for its own usage . Note that the include ( )
method has no effect on functions , constants and global
variables , which must anyway be accessed directly from the
lib object returned by the original FFI instance .
"""
if not isinstance ( ffi_to_include , FFI ) :
raise TypeError ( " ffi.include() expects an argument that is also of "
" type cffi.FFI, not %r " % (
type ( ffi_to_include ) . __name__ , ) )
if ffi_to_include is self :
raise ValueError ( " self.include(self) " )
with ffi_to_include . _lock :
with self . _lock :
self . _parser . include ( ffi_to_include . _parser )
self . _cdefsources . append ( ' [ ' )
self . _cdefsources . extend ( ffi_to_include . _cdefsources )
self . _cdefsources . append ( ' ] ' )
self . _included_ffis . append ( ffi_to_include )
def new_handle ( self , x ) :
return self . _backend . newp_handle ( self . BVoidP , x )
def from_handle ( self , x ) :
return self . _backend . from_handle ( x )
def release ( self , x ) :
self . _backend . release ( x )
def set_unicode ( self , enabled_flag ) :
""" Windows: if ' enabled_flag ' is True, enable the UNICODE and
_UNICODE defines in C , and declare the types like TCHAR and LPTCSTR
to be ( pointers to ) wchar_t . If ' enabled_flag ' is False ,
declare these types to be ( pointers to ) plain 8 - bit characters .
This is mostly for backward compatibility ; you usually want True .
"""
if self . _windows_unicode is not None :
raise ValueError ( " set_unicode() can only be called once " )
enabled_flag = bool ( enabled_flag )
if enabled_flag :
self . cdef ( " typedef wchar_t TBYTE; "
" typedef wchar_t TCHAR; "
" typedef const wchar_t *LPCTSTR; "
" typedef const wchar_t *PCTSTR; "
" typedef wchar_t *LPTSTR; "
" typedef wchar_t *PTSTR; "
" typedef TBYTE *PTBYTE; "
" typedef TCHAR *PTCHAR; " )
else :
self . cdef ( " typedef char TBYTE; "
" typedef char TCHAR; "
" typedef const char *LPCTSTR; "
" typedef const char *PCTSTR; "
" typedef char *LPTSTR; "
" typedef char *PTSTR; "
" typedef TBYTE *PTBYTE; "
" typedef TCHAR *PTCHAR; " )
self . _windows_unicode = enabled_flag
def _apply_windows_unicode ( self , kwds ) :
defmacros = kwds . get ( ' define_macros ' , ( ) )
if not isinstance ( defmacros , ( list , tuple ) ) :
raise TypeError ( " ' define_macros ' must be a list or tuple " )
defmacros = list ( defmacros ) + [ ( ' UNICODE ' , ' 1 ' ) ,
( ' _UNICODE ' , ' 1 ' ) ]
kwds [ ' define_macros ' ] = defmacros
def _apply_embedding_fix ( self , kwds ) :
# must include an argument like "-lpython2.7" for the compiler
def ensure ( key , value ) :
lst = kwds . setdefault ( key , [ ] )
if value not in lst :
lst . append ( value )
#
if ' __pypy__ ' in sys . builtin_module_names :
import os
if sys . platform == " win32 " :
# we need 'libpypy-c.lib'. Current distributions of
# pypy (>= 4.1) contain it as 'libs/python27.lib'.
pythonlib = " python {0[0]} {0[1]} " . format ( sys . version_info )
if hasattr ( sys , ' prefix ' ) :
ensure ( ' library_dirs ' , os . path . join ( sys . prefix , ' libs ' ) )
else :
# we need 'libpypy-c.{so,dylib}', which should be by
# default located in 'sys.prefix/bin' for installed
# systems.
if sys . version_info < ( 3 , ) :
pythonlib = " pypy-c "
else :
pythonlib = " pypy3-c "
if hasattr ( sys , ' prefix ' ) :
ensure ( ' library_dirs ' , os . path . join ( sys . prefix , ' bin ' ) )
# On uninstalled pypy's, the libpypy-c is typically found in
# .../pypy/goal/.
if hasattr ( sys , ' prefix ' ) :
ensure ( ' library_dirs ' , os . path . join ( sys . prefix , ' pypy ' , ' goal ' ) )
else :
if sys . platform == " win32 " :
template = " python %d %d "
if hasattr ( sys , ' gettotalrefcount ' ) :
template + = ' _d '
else :
try :
import sysconfig
except ImportError : # 2.6
from distutils import sysconfig
template = " python %d . %d "
if sysconfig . get_config_var ( ' DEBUG_EXT ' ) :
template + = sysconfig . get_config_var ( ' DEBUG_EXT ' )
pythonlib = ( template %
( sys . hexversion >> 24 , ( sys . hexversion >> 16 ) & 0xff ) )
if hasattr ( sys , ' abiflags ' ) :
pythonlib + = sys . abiflags
ensure ( ' libraries ' , pythonlib )
if sys . platform == " win32 " :
ensure ( ' extra_link_args ' , ' /MANIFEST ' )
def set_source ( self , module_name , source , source_extension = ' .c ' , * * kwds ) :
import os
if hasattr ( self , ' _assigned_source ' ) :
raise ValueError ( " set_source() cannot be called several times "
" per ffi object " )
if not isinstance ( module_name , basestring ) :
raise TypeError ( " ' module_name ' must be a string " )
if os . sep in module_name or ( os . altsep and os . altsep in module_name ) :
raise ValueError ( " ' module_name ' must not contain ' / ' : use a dotted "
" name to make a ' package.module ' location " )
self . _assigned_source = ( str ( module_name ) , source ,
source_extension , kwds )
def set_source_pkgconfig ( self , module_name , pkgconfig_libs , source ,
source_extension = ' .c ' , * * kwds ) :
from . import pkgconfig
if not isinstance ( pkgconfig_libs , list ) :
raise TypeError ( " the pkgconfig_libs argument must be a list "
" of package names " )
kwds2 = pkgconfig . flags_from_pkgconfig ( pkgconfig_libs )
pkgconfig . merge_flags ( kwds , kwds2 )
self . set_source ( module_name , source , source_extension , * * kwds )
def distutils_extension ( self , tmpdir = ' build ' , verbose = True ) :
from distutils . dir_util import mkpath
from . recompiler import recompile
#
if not hasattr ( self , ' _assigned_source ' ) :
if hasattr ( self , ' verifier ' ) : # fallback, 'tmpdir' ignored
return self . verifier . get_extension ( )
raise ValueError ( " set_source() must be called before "
" distutils_extension() " )
module_name , source , source_extension , kwds = self . _assigned_source
if source is None :
raise TypeError ( " distutils_extension() is only for C extension "
" modules, not for dlopen()-style pure Python "
" modules " )
mkpath ( tmpdir )
ext , updated = recompile ( self , module_name ,
source , tmpdir = tmpdir , extradir = tmpdir ,
source_extension = source_extension ,
call_c_compiler = False , * * kwds )
if verbose :
if updated :
sys . stderr . write ( " regenerated: %r \n " % ( ext . sources [ 0 ] , ) )
else :
sys . stderr . write ( " not modified: %r \n " % ( ext . sources [ 0 ] , ) )
return ext
def emit_c_code ( self , filename ) :
from . recompiler import recompile
#
if not hasattr ( self , ' _assigned_source ' ) :
raise ValueError ( " set_source() must be called before emit_c_code() " )
module_name , source , source_extension , kwds = self . _assigned_source
if source is None :
raise TypeError ( " emit_c_code() is only for C extension modules, "
" not for dlopen()-style pure Python modules " )
recompile ( self , module_name , source ,
c_file = filename , call_c_compiler = False , * * kwds )
def emit_python_code ( self , filename ) :
from . recompiler import recompile
#
if not hasattr ( self , ' _assigned_source ' ) :
raise ValueError ( " set_source() must be called before emit_c_code() " )
module_name , source , source_extension , kwds = self . _assigned_source
if source is not None :
raise TypeError ( " emit_python_code() is only for dlopen()-style "
" pure Python modules, not for C extension modules " )
recompile ( self , module_name , source ,
c_file = filename , call_c_compiler = False , * * kwds )
def compile ( self , tmpdir = ' . ' , verbose = 0 , target = None , debug = None ) :
""" The ' target ' argument gives the final file name of the
compiled DLL . Use ' * ' to force distutils ' choice, suitable for
regular CPython C API modules . Use a file name ending in ' .* '
to ask for the system ' s default extension for dynamic libraries
( . so / . dll / . dylib ) .
The default is ' * ' when building a non - embedded C API extension ,
and ( module_name + ' .* ' ) when building an embedded library .
"""
from . recompiler import recompile
#
if not hasattr ( self , ' _assigned_source ' ) :
raise ValueError ( " set_source() must be called before compile() " )
module_name , source , source_extension , kwds = self . _assigned_source
return recompile ( self , module_name , source , tmpdir = tmpdir ,
target = target , source_extension = source_extension ,
compiler_verbose = verbose , debug = debug , * * kwds )
def init_once ( self , func , tag ) :
# Read _init_once_cache[tag], which is either (False, lock) if
# we're calling the function now in some thread, or (True, result).
# Don't call setdefault() in most cases, to avoid allocating and
# immediately freeing a lock; but still use setdefaut() to avoid
# races.
try :
x = self . _init_once_cache [ tag ]
except KeyError :
x = self . _init_once_cache . setdefault ( tag , ( False , allocate_lock ( ) ) )
# Common case: we got (True, result), so we return the result.
if x [ 0 ] :
return x [ 1 ]
# Else, it's a lock. Acquire it to serialize the following tests.
with x [ 1 ] :
# Read again from _init_once_cache the current status.
x = self . _init_once_cache [ tag ]
if x [ 0 ] :
return x [ 1 ]
# Call the function and store the result back.
result = func ( )
self . _init_once_cache [ tag ] = ( True , result )
return result
def embedding_init_code ( self , pysource ) :
if self . _embedding :
raise ValueError ( " embedding_init_code() can only be called once " )
# fix 'pysource' before it gets dumped into the C file:
# - remove empty lines at the beginning, so it starts at "line 1"
# - dedent, if all non-empty lines are indented
# - check for SyntaxErrors
import re
match = re . match ( r ' \ s* \ n ' , pysource )
if match :
pysource = pysource [ match . end ( ) : ]
lines = pysource . splitlines ( ) or [ ' ' ]
prefix = re . match ( r ' \ s* ' , lines [ 0 ] ) . group ( )
for i in range ( 1 , len ( lines ) ) :
line = lines [ i ]
if line . rstrip ( ) :
while not line . startswith ( prefix ) :
prefix = prefix [ : - 1 ]
i = len ( prefix )
lines = [ line [ i : ] + ' \n ' for line in lines ]
pysource = ' ' . join ( lines )
#
compile ( pysource , " cffi_init " , " exec " )
#
self . _embedding = pysource
def def_extern ( self , * args , * * kwds ) :
raise ValueError ( " ffi.def_extern() is only available on API-mode FFI "
" objects " )
def list_types ( self ) :
""" Returns the user type names known to this FFI instance.
This returns a tuple containing three lists of names :
( typedef_names , names_of_structs , names_of_unions )
"""
typedefs = [ ]
structs = [ ]
unions = [ ]
for key in self . _parser . _declarations :
if key . startswith ( ' typedef ' ) :
typedefs . append ( key [ 8 : ] )
elif key . startswith ( ' struct ' ) :
structs . append ( key [ 7 : ] )
elif key . startswith ( ' union ' ) :
unions . append ( key [ 6 : ] )
typedefs . sort ( )
structs . sort ( )
unions . sort ( )
return ( typedefs , structs , unions )
def _load_backend_lib ( backend , name , flags ) :
import os
if not isinstance ( name , basestring ) :
if sys . platform != " win32 " or name is not None :
return backend . load_library ( name , flags )
name = " c " # Windows: load_library(None) fails, but this works
# on Python 2 (backward compatibility hack only)
first_error = None
if ' . ' in name or ' / ' in name or os . sep in name :
try :
return backend . load_library ( name , flags )
except OSError as e :
first_error = e
import ctypes . util
path = ctypes . util . find_library ( name )
if path is None :
if name == " c " and sys . platform == " win32 " and sys . version_info > = ( 3 , ) :
raise OSError ( " dlopen(None) cannot work on Windows for Python 3 "
" (see http://bugs.python.org/issue23606) " )
msg = ( " ctypes.util.find_library() did not manage "
" to locate a library called %r " % ( name , ) )
if first_error is not None :
msg = " %s . Additionally, %s " % ( first_error , msg )
raise OSError ( msg )
return backend . load_library ( path , flags )
def _make_ffi_library ( ffi , libname , flags ) :
backend = ffi . _backend
backendlib = _load_backend_lib ( backend , libname , flags )
#
def accessor_function ( name ) :
key = ' function ' + name
tp , _ = ffi . _parser . _declarations [ key ]
BType = ffi . _get_cached_btype ( tp )
value = backendlib . load_function ( BType , name )
library . __dict__ [ name ] = value
#
def accessor_variable ( name ) :
key = ' variable ' + name
tp , _ = ffi . _parser . _declarations [ key ]
BType = ffi . _get_cached_btype ( tp )
read_variable = backendlib . read_variable
write_variable = backendlib . write_variable
setattr ( FFILibrary , name , property (
lambda self : read_variable ( BType , name ) ,
lambda self , value : write_variable ( BType , name , value ) ) )
#
def addressof_var ( name ) :
try :
return addr_variables [ name ]
except KeyError :
with ffi . _lock :
if name not in addr_variables :
key = ' variable ' + name
tp , _ = ffi . _parser . _declarations [ key ]
BType = ffi . _get_cached_btype ( tp )
if BType . kind != ' array ' :
BType = model . pointer_cache ( ffi , BType )
p = backendlib . load_function ( BType , name )
addr_variables [ name ] = p
return addr_variables [ name ]
#
def accessor_constant ( name ) :
raise NotImplementedError ( " non-integer constant ' %s ' cannot be "
" accessed from a dlopen() library " % ( name , ) )
#
def accessor_int_constant ( name ) :
library . __dict__ [ name ] = ffi . _parser . _int_constants [ name ]
#
accessors = { }
accessors_version = [ False ]
addr_variables = { }
#
def update_accessors ( ) :
if accessors_version [ 0 ] is ffi . _cdef_version :
return
#
for key , ( tp , _ ) in ffi . _parser . _declarations . items ( ) :
if not isinstance ( tp , model . EnumType ) :
tag , name = key . split ( ' ' , 1 )
if tag == ' function ' :
accessors [ name ] = accessor_function
elif tag == ' variable ' :
accessors [ name ] = accessor_variable
elif tag == ' constant ' :
accessors [ name ] = accessor_constant
else :
for i , enumname in enumerate ( tp . enumerators ) :
def accessor_enum ( name , tp = tp , i = i ) :
tp . check_not_partial ( )
library . __dict__ [ name ] = tp . enumvalues [ i ]
accessors [ enumname ] = accessor_enum
for name in ffi . _parser . _int_constants :
accessors . setdefault ( name , accessor_int_constant )
accessors_version [ 0 ] = ffi . _cdef_version
#
def make_accessor ( name ) :
with ffi . _lock :
if name in library . __dict__ or name in FFILibrary . __dict__ :
return # added by another thread while waiting for the lock
if name not in accessors :
update_accessors ( )
if name not in accessors :
raise AttributeError ( name )
accessors [ name ] ( name )
#
class FFILibrary ( object ) :
def __getattr__ ( self , name ) :
make_accessor ( name )
return getattr ( self , name )
def __setattr__ ( self , name , value ) :
try :
property = getattr ( self . __class__ , name )
except AttributeError :
make_accessor ( name )
setattr ( self , name , value )
else :
property . __set__ ( self , value )
def __dir__ ( self ) :
with ffi . _lock :
update_accessors ( )
return accessors . keys ( )
def __addressof__ ( self , name ) :
if name in library . __dict__ :
return library . __dict__ [ name ]
if name in FFILibrary . __dict__ :
return addressof_var ( name )
make_accessor ( name )
if name in library . __dict__ :
return library . __dict__ [ name ]
if name in FFILibrary . __dict__ :
return addressof_var ( name )
raise AttributeError ( " cffi library has no function or "
" global variable named ' %s ' " % ( name , ) )
def __cffi_close__ ( self ) :
backendlib . close_lib ( )
self . __dict__ . clear ( )
#
if isinstance ( libname , basestring ) :
try :
if not isinstance ( libname , str ) : # unicode, on Python 2
libname = libname . encode ( ' utf-8 ' )
FFILibrary . __name__ = ' FFILibrary_ %s ' % libname
except UnicodeError :
pass
library = FFILibrary ( )
return library , library . __dict__
def _builtin_function_type ( func ) :
# a hack to make at least ffi.typeof(builtin_function) work,
# if the builtin function was obtained by 'vengine_cpy'.
import sys
try :
module = sys . modules [ func . __module__ ]
ffi = module . _cffi_original_ffi
types_of_builtin_funcs = module . _cffi_types_of_builtin_funcs
tp = types_of_builtin_funcs [ func ]
except ( KeyError , AttributeError , TypeError ) :
return None
else :
with ffi . _lock :
return ffi . _get_cached_btype ( tp )