import pyaudio
from pydub import AudioSegment
import threading
import wave
import time
from pyOpenRPA . Utils import Text
import os
def DeviceMicrophoneIndex ( ) :
""" L-,W+: Выполнить поиск устройства, с помощью которого можно будет выполнить захват c микрофона.
"""
p = pyaudio . PyAudio ( )
lDeviceInfoDict = p . get_default_input_device_info ( )
lDefaultIndexInt = lDeviceInfoDict [ " index " ]
return lDefaultIndexInt
def DeviceSystemSoundIndex ( ) :
""" L-,W+: Выполнить поиск устройства, с помощью которого можно будет выполнить захват аудио, которое поступает из приложений. Например: аудиоконференции Zoom, whatsapp, telegram и т.д.
"""
p = pyaudio . PyAudio ( )
inInputBool = True
inIsLoopbackBool = True
if inInputBool == True :
lDeviceInfoDict = p . get_default_output_device_info ( )
lDefaultIndexInt = lDeviceInfoDict [ " index " ]
lDefaultNameStr = lDeviceInfoDict [ " name " ]
lCatchIndexInt = None
lCatchDiffRatioFloat = 0.0
for lItemDict in DeviceListGet ( ) :
lCompareBool = False
if lItemDict [ " MaxOutputChannelsInt " ] > 0 :
if inIsLoopbackBool == True and lItemDict [ " HostApiStr " ] == " Windows WASAPI " : lCompareBool = True
elif inIsLoopbackBool == False : lCompareBool = True
if lCompareBool == True :
lDiffRationFloat = Text . SimilarityNoCase ( in1Str = lDefaultNameStr , in2Str = lItemDict [ " NameStr " ] )
if lDiffRationFloat > lCatchDiffRatioFloat :
lCatchDiffRatioFloat = lDiffRationFloat
lCatchIndexInt = lItemDict [ " IndexInt " ]
else :
lDeviceInfoDict = p . get_default_output_device_info ( )
lDefaultIndexInt = lDeviceInfoDict [ " index " ]
lDefaultNameStr = lDeviceInfoDict [ " name " ]
lCatchIndexInt = None
lCatchDiffRatioFloat = 0.0
for lItemDict in DeviceListGet ( ) :
lCompareBool = False
if lItemDict [ " MaxInputChannelsInt " ] > 0 :
if inIsLoopbackBool == True and lItemDict [ " HostApiStr " ] == " Windows WASAPI " : lCompareBool = True
elif inIsLoopbackBool == False : lCompareBool = True
if lCompareBool == True :
lDiffRationFloat = Text . SimilarityNoCase ( in1Str = lDefaultNameStr , in2Str = lItemDict [ " NameStr " ] )
if lDiffRationFloat > lCatchDiffRatioFloat : lCatchIndexInt = lItemDict [ " IndexInt " ]
return lCatchIndexInt
def DeviceListGet ( ) :
""" L-,W+: Вернуть список аудио устройст (входящих и исходящих, микрофонов и динамиков).
from pyOpenRPA . Robot import Audio
Audio . DeviceListGet ( )
: return : [ { " IndexInt " : 1 , " NameStr " : " " ,
" HostApiInt " : 0 , " HostApiStr " : " MME " | " Windows WASAPI " | " Windows WDM-KS " ,
" MaxInputChannelsInt " : 0 , " MaxOutputChannelsInt " : 0 ,
" DefaultSampleRateFloat " : 44100.0
} , . . . ]
: rtype : list
"""
l_result = [ ]
p = pyaudio . PyAudio ( )
for i in range ( 0 , p . get_device_count ( ) ) :
l_info = p . get_device_info_by_index ( i )
l_info_dict = {
" IndexInt " : l_info [ " index " ] ,
" NameStr " : l_info [ " name " ] ,
" MaxInputChannelsInt " : l_info [ " maxInputChannels " ] ,
" MaxOutputChannelsInt " : l_info [ " maxOutputChannels " ] ,
" HostApiInt " : l_info [ " hostApi " ] ,
" DefaultSampleRateFloat " : l_info [ " defaultSampleRate " ] ,
" HostApiStr " : p . get_host_api_info_by_index ( l_info [ " hostApi " ] ) [ " name " ] #"MME"|"Windows WASAPI"|"Windows WDM-KS"
}
l_result . append ( l_info_dict )
return l_result
class Recorder :
mT = [ ]
mStatusStr = " 0_READY " # "0_READY", "1_RECORDING"
mAudio = None
mCaptureThread = None
mStream = None
mDeviceInt = None
mChannelCountInt = None
mFramesInt = 512
mRecordedFramesList = [ ]
mSampleRateInt = None
mSampleSizeInt = None
mCaptureBool = True
mFolderPathStr = None
mFileNameStr = None
mFileFormatStr = None
mFileAvailableChunkInt = None
mFileNameList = None
mChunkSecFloat = None
mChunkSilentSecFloat = None
mStartSecFloat = None
mStartChunkSecFloat = None
mDurationSecFloat = None
mThresholdInt = 500
mSilentLastCheckTimeFloat = None
mIsMicrophoneBool = None
def __init__ ( self , inDeviceInt = None ) :
self . mDeviceInt = inDeviceInt
if inDeviceInt == None : inDeviceInt = DeviceSystemSoundIndex ( )
self . mDeviceInt = inDeviceInt
if DeviceListGet ( ) [ inDeviceInt ] [ " MaxInputChannelsInt " ] > 0 : self . mIsMicrophoneBool = True
else : self . mIsMicrophoneBool = False
def StatusGet ( self ) :
return self . mStatusStr
def CaptureStart ( self , inFolderPathStr = " " , inFileNameStr = " out " , inFileFormatStr = " mp3 " , inDurationSecFloat = None , inChunkSecFloat = 300.0 , inChunkSilentSecFloat = 10.0 ) :
# CHECK AUX.mp3
self . mFileNameList = [ ]
self . mRecordedFramesList = [ ]
self . mStatusStr = " 1_RECORDING "
if inChunkSecFloat != None or inChunkSilentSecFloat != None : self . mFileAvailableChunkInt = 0
self . mDurationSecFloat = inDurationSecFloat
self . mChunkSecFloat = inChunkSecFloat
self . mChunkSilentSecFloat = inChunkSilentSecFloat
self . mSilentLastCheckTimeFloat = time . time ( )
self . mFolderPathStr = inFolderPathStr
self . mFileNameStr = inFileNameStr
self . mFileFormatStr = inFileFormatStr
self . mAudio = pyaudio . PyAudio ( )
self . mSampleSizeInt = self . mAudio . get_sample_size ( pyaudio . paInt16 )
lDeviceInfoDict = self . mAudio . get_device_info_by_index ( self . mDeviceInt )
#Open stream
self . mSampleRateInt = int ( lDeviceInfoDict [ " defaultSampleRate " ] )
self . mChannelCountInt = lDeviceInfoDict [ " maxInputChannels " ] if ( lDeviceInfoDict [ " maxOutputChannels " ] < lDeviceInfoDict [ " maxInputChannels " ] ) else lDeviceInfoDict [ " maxOutputChannels " ]
self . mStream = self . mAudio . open ( format = pyaudio . paInt16 ,
channels = self . mChannelCountInt ,
rate = self . mSampleRateInt ,
input = True ,
frames_per_buffer = self . mFramesInt ,
input_device_index = lDeviceInfoDict [ " index " ] ,
as_loopback = not self . mIsMicrophoneBool )
self . mCaptureThread = threading . Thread ( target = self . __Capture__ )
self . mStartSecFloat = time . time ( )
self . mStartChunkSecFloat = self . mStartSecFloat
self . mCaptureThread . start ( )
def __Capture__ ( self ) :
while self . mCaptureBool == True :
self . mRecordedFramesList . append ( self . mStream . read ( self . mFramesInt ) )
self . __TriggerCheck__ ( )
def CaptureStop ( self , inWaitStream = True ) :
self . mCaptureBool = False
if inWaitStream == True : self . mCaptureThread . join ( )
self . mStream . stop_stream ( )
self . mStream . close ( )
#Close module
self . mAudio . terminate ( )
self . CaptureChunk ( )
self . mStatusStr = " 0_READY "
def CaptureChunk ( self ) :
lFileNameStr = self . mFileNameStr
if self . mFileAvailableChunkInt != None :
lFileNameStr + = f " _ { self . mFileAvailableChunkInt : 05 } "
self . mFileAvailableChunkInt = self . mFileAvailableChunkInt + 1
# Advanced usage, if you have raw audio data:
sound = AudioSegment (
# raw audio data (bytes)
data = b ' ' . join ( self . mRecordedFramesList ) ,
# 2 byte (16 bit) samples
sample_width = self . mSampleSizeInt ,
# 44.1 kHz frame rate
frame_rate = self . mSampleRateInt ,
# stereo
channels = self . mChannelCountInt
)
sound . export ( os . path . join ( self . mFolderPathStr , f " { lFileNameStr } . { self . mFileFormatStr } " ) , format = f " { self . mFileFormatStr } " )
self . mFileNameList . append ( f " { lFileNameStr } . { self . mFileFormatStr } " )
self . mRecordedFramesList = [ ]
self . mStartChunkSecFloat = time . time ( )
def FileListGet ( self ) :
return self . mFileNameList
def FileLastGet ( self ) :
return self . mFileNameList [ - 1 ]
def __Callback__ ( self , inDefList ) :
pass
def __CallbackIsSilent__ ( self ) :
pass
def __CallbackIsChunked__ ( self ) :
pass
def __CallbackIsStopped__ ( self ) :
pass
def IsSilent ( self , inLastSecFloat = None ) :
" Returns ' True ' if below the ' silent ' threshold "
self . mSilentLastCheckTimeFloat = time . time ( )
if inLastSecFloat == None : inLastSecFloat = self . mChunkSilentSecFloat
lFrameLenInt = int ( self . mSampleSizeInt * inLastSecFloat )
if lFrameLenInt < len ( self . mRecordedFramesList ) : lData = self . mRecordedFramesList [ - lFrameLenInt : ]
else : lData = self . mRecordedFramesList
return max ( lData ) < self . mThresholdInt
def __TriggerCheck__ ( self ) :
""" L-,W+: Контроль записи / остановки аудио по следующим критериям:
- Общая длительность ,
- Максимальная длительность части ,
- Максимальная длит тишины ( часть ) ,
- Максимальная длительность тишины ( остановка ) ,
"""
# Проверка по длине записи (CHUNK)
if self . mChunkSecFloat != None and time . time ( ) - self . mStartChunkSecFloat > self . mChunkSecFloat : self . CaptureChunk ( )
# Остановка записи по максимальной длине
if self . mDurationSecFloat != None and time . time ( ) - self . mStartSecFloat > self . mDurationSecFloat : self . CaptureStop ( inWaitStream = False )
# Проверка тишины
#if self.mChunkSilentSecFloat != None and time.time() - self.mSilentLastCheckTimeFloat and self.IsSilent(): self.mT.append("ТИШИНА!!")