You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ORPA-pyOpenRPA/Sources/pyOpenRPA/Robot/Audio.py

391 lines
20 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from pyOpenRPA.Tools import CrossOS
if CrossOS.IS_WINDOWS_BOOL: import pyaudio
if CrossOS.IS_WINDOWS_BOOL: from pydub import AudioSegment
import threading
import time
from pyOpenRPA.Utils import Text
import os
def DeviceMicrophoneIndex():
"""L-,W+: Выполнить поиск устройства, с помощью которого можно будет выполнить захват c микрофона.
"""
p = pyaudio.PyAudio()
lDeviceInfoDict = p.get_default_input_device_info()
lDefaultIndexInt = lDeviceInfoDict["index"]
return lDefaultIndexInt
def DeviceSystemSoundIndex():
"""L-,W+: Выполнить поиск устройства, с помощью которого можно будет выполнить захват аудио, которое поступает из приложений. Например: аудиоконференции Zoom, whatsapp, telegram и т.д.
"""
p = pyaudio.PyAudio()
inInputBool = True
inIsLoopbackBool = True
if inInputBool == True:
lDeviceInfoDict = p.get_default_output_device_info()
lDefaultIndexInt = lDeviceInfoDict["index"]
lDefaultNameStr = lDeviceInfoDict["name"]
lCatchIndexInt = None
lCatchDiffRatioFloat = 0.0
for lItemDict in DeviceListGet():
lCompareBool = False
if lItemDict["MaxOutputChannelsInt"]>0:
if inIsLoopbackBool==True and lItemDict["HostApiStr"]=="Windows WASAPI": lCompareBool = True
elif inIsLoopbackBool==False: lCompareBool = True
if lCompareBool == True:
lDiffRationFloat = Text.SimilarityNoCase(in1Str=lDefaultNameStr, in2Str=lItemDict["NameStr"])
if lDiffRationFloat> lCatchDiffRatioFloat:
lCatchDiffRatioFloat = lDiffRationFloat
lCatchIndexInt=lItemDict["IndexInt"]
else:
lDeviceInfoDict = p.get_default_output_device_info()
lDefaultIndexInt = lDeviceInfoDict["index"]
lDefaultNameStr = lDeviceInfoDict["name"]
lCatchIndexInt = None
lCatchDiffRatioFloat = 0.0
for lItemDict in DeviceListGet():
lCompareBool = False
if lItemDict["MaxInputChannelsInt"]>0:
if inIsLoopbackBool==True and lItemDict["HostApiStr"]=="Windows WASAPI": lCompareBool = True
elif inIsLoopbackBool==False: lCompareBool = True
if lCompareBool == True:
lDiffRationFloat = Text.SimilarityNoCase(in1Str=lDefaultNameStr, in2Str=lItemDict["NameStr"])
if lDiffRationFloat> lCatchDiffRatioFloat: lCatchIndexInt=lItemDict["IndexInt"]
return lCatchIndexInt
def DeviceListGet():
"""L-,W+: Вернуть список аудио устройст (входящих и исходящих, микрофонов и динамиков).
from pyOpenRPA.Robot import Audio
Audio.DeviceListGet()
:return: [{"IndexInt":1, "NameStr": "",
"HostApiInt": 0, "HostApiStr": "MME"|"Windows WASAPI"|"Windows WDM-KS",
"MaxInputChannelsInt": 0, "MaxOutputChannelsInt": 0,
"DefaultSampleRateFloat": 44100.0
},...]
:rtype: list
"""
l_result = []
p = pyaudio.PyAudio()
for i in range(0, p.get_device_count()):
l_info = p.get_device_info_by_index(i)
l_info_dict = {
"IndexInt":l_info["index"],
"NameStr": l_info["name"],
"MaxInputChannelsInt": l_info["maxInputChannels"],
"MaxOutputChannelsInt": l_info["maxOutputChannels"],
"HostApiInt": l_info["hostApi"],
"DefaultSampleRateFloat": l_info["defaultSampleRate"],
"HostApiStr": p.get_host_api_info_by_index(l_info["hostApi"])["name"] #"MME"|"Windows WASAPI"|"Windows WDM-KS"
}
l_result.append(l_info_dict)
return l_result
class Recorder:
mStatusStr = "0_READY" # "0_READY", "1_RECORDING"
mAudio = None
mCaptureThread = None
mStream = None
mDeviceInt = None
mChannelCountInt = None
mFramesInt = 512
mRecordedFramesList = []
mSampleRateInt = None
mSampleSizeInt = None
mCaptureBool = True
mFolderPathStr = None
mFileNameStr = None
mFileFormatStr = None
mFileAvailableChunkInt = None
mFileNameList=None
mFileInfoDict=None # {"file.mp3":{StartSecFloat:, EndSecFloat:, Extra:,PathStr, }}
mChunkSecFloat = None
mStartSecFloat = None
mStartChunkSecFloat = None
mDurationSecFloat = None
mThresholdInt = 500
mSilentLastCheckTimeFloat = None
mIsMicrophoneBool=None
mCallbackChunkDef = None
mCallbackStopDef = None
def __init__(self, inDeviceInt=None):
"""L-,W+: Инициализация экземпляра класса записи звука
.. code-block:: python
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
:param inDeviceInt: , по умолчанию None (использование устройства, полученного от DeviceSystemSoundIndex())
:type inDeviceInt: int, опционально
"""
self.mDeviceInt = inDeviceInt
if inDeviceInt == None: inDeviceInt = DeviceSystemSoundIndex()
self.mDeviceInt = inDeviceInt
if DeviceListGet()[inDeviceInt]["MaxInputChannelsInt"]>0: self.mIsMicrophoneBool = True
else: self.mIsMicrophoneBool = False
def StatusGet(self):
"""L-,W+: Вернуть статус записи звука
.. code-block:: python
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
lRec.StatusGet()
:return: "0_READY" или "1_RECORDING"
:rtype: str
"""
return self.mStatusStr
def CaptureStart(self, inFolderPathStr="",inFileNameStr = "out", inFileFormatStr = "mp3", inDurationSecFloat = None, inChunkSecFloat = 300.0, inCallbackChunkDef = None, inCallbackStopDef = None):
"""L-,W+: Начать запись звука
.. code-block:: python
def CallbackChunk(lRec, lFilenameStr):
pass # КОД ОБРАБОТКИ ПОСЛЕ СОХРАНЕНИЯ ЧАСТИ
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
lRec.CaptureStart(inFileNameStr = "out", inFileFormatStr = "mp3", inDurationSecFloat = None, inChunkSecFloat = 5.0, inCallbackChunkDef=CallbackChunk)
lRec.CaptureStop()
:param inFolderPathStr: Путь к папке, в которую сохранять аудиофайлы захвата , по умолчанию ""
:type inFolderPathStr: str, опционально
:param inFileNameStr: Наименование файла без расширения, по умолчанию "out"
:type inFileNameStr: str, опционально
:param inFileFormatStr: Наименование формата, в который будет происходить сохранение ("mp3" или "wav" или "raw" или "aif"), по умолчанию "mp3"
:type inFileFormatStr: str, опционально
:param inDurationSecFloat: Длительность захвата аудио, по умолчанию None (пока не поступит команда CaptureStop() )
:type inDurationSecFloat: float, опционально
:param inChunkSecFloat: Максимальная длина части аудиофайла, по умолчанию 300.0
:type inChunkSecFloat: float, опционально
:param inCallbackChunkDef: Функция, которая будет инициирована в случае выполнения Chunk сохранения (сохранение части). Callback функция должна принимать 2 аргумента: экземпляр класса Recorder и наименование сохраненного файла. Внимание! Функция запускается асинхронно!
:type inCallbackChunkDef: def, опционально
:param inCallbackStopDef: Функция, которая будет инициирована в случае окончания записи. Callback функция должна принимать 2 аргумента: экземпляр класса Recorder и наименование сохраненного файла. Внимание! Функция запускается асинхронно!
:type inCallbackStopDef: def, опционально
"""
# CHECK AUX.mp3
self.mFileInfoDict = {}
self.mFileNameList=[]
#self.mRecordedFramesList=[]
self.mStatusStr = "1_RECORDING"
if inChunkSecFloat != None: self.mFileAvailableChunkInt = 0
self.mDurationSecFloat = inDurationSecFloat
self.mChunkSecFloat = inChunkSecFloat
self.mSilentLastCheckTimeFloat=time.time()
self.mFolderPathStr = inFolderPathStr
self.mFileNameStr = inFileNameStr
self.mFileFormatStr = inFileFormatStr
self.mAudio = pyaudio.PyAudio()
self.mSampleSizeInt = self.mAudio.get_sample_size(pyaudio.paInt16)
self.mCallbackChunkDef = inCallbackChunkDef
self.mCallbackStopDef = inCallbackStopDef
lDeviceInfoDict = self.mAudio.get_device_info_by_index(self.mDeviceInt)
#Open stream
self.mSampleRateInt = int(lDeviceInfoDict["defaultSampleRate"])
self.mChannelCountInt = lDeviceInfoDict["maxInputChannels"] if (lDeviceInfoDict["maxOutputChannels"] < lDeviceInfoDict["maxInputChannels"]) else lDeviceInfoDict["maxOutputChannels"]
self.mStream = self.mAudio.open(format = pyaudio.paInt16,
channels = self.mChannelCountInt,
rate = self.mSampleRateInt,
input = True,
frames_per_buffer = self.mFramesInt,
input_device_index = lDeviceInfoDict["index"],
as_loopback = not self.mIsMicrophoneBool)
self.mCaptureThread = threading.Thread(target=self.__Capture__, daemon = True)
self.mStartSecFloat = time.time()
self.mStartChunkSecFloat = self.mStartSecFloat
self.mCaptureThread.start()
def __Capture__(self):
while self.mCaptureBool==True:
self.mRecordedFramesList.append(self.mStream.read(self.mFramesInt))
self.__TriggerCheck__()
def CaptureWait(self):
"""L-,W+: Ожидать окончания захвата аудио
.. code-block:: python
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
lRec.CaptureStart(inFileNameStr = "out", inFileFormatStr = "mp3", inDurationSecFloat = 10.0, inChunkSecFloat = 5.0)
lRec.CaptureWait()
"""
self.mCaptureThread.join()
def CaptureStop(self, inWaitStream=True, inExtra=None):
"""L-,W+: Остановить захват аудио
.. code-block:: python
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
lRec.CaptureStart(inFileNameStr = "out", inFileFormatStr = "mp3", inDurationSecFloat = None, inChunkSecFloat = 5.0)
lRec.CaptureStop()
:param inWaitStream: True - выполнить ожидание окончания потока захвата перед окончанием, по умолчанию True
:type inWaitStream: bool, опционально
:param inExtra: Дополнительный контент, необходимый для идентификации файла. В дальнейшем получить структуру можно с помощью функции FileInfoGet()['Extra'], по умолчанию None
:type inExtra: any, опционально
"""
self.mCaptureBool=False
self.mStream.stop_stream()
if inWaitStream == True: self.mCaptureThread.join()
self.mStream.close()
#Close module
self.mAudio.terminate()
lFileNameStr = self.CaptureChunk(inExtra=inExtra, inForceChunkBool=False)
self.mStatusStr = "0_READY"
if self.mCallbackStopDef != None:
lCallbackThread = threading.Thread(target=self.mCallbackStopDef,args=[self, lFileNameStr], daemon = True)
lCallbackThread.start()
def CaptureChunk(self, inExtra=None, inForceChunkBool=True, inShiftSecFloat = 0.0):
"""L-,W+: Зафиксировать захват аудио в виде промежуточного файла вида: <имя файла>_00000.mp3
.. code-block:: python
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
lRec.CaptureStart(inFileNameStr = "out", inFileFormatStr = "mp3", inDurationSecFloat = None, inChunkSecFloat = 5.0)
lRec.CaptureChunk()
:param inExtra: Дополнительный контент, необходимый для идентификации файла. В дальнейшем получить структуру можно с помощью функции FileInfoGet()['Extra'], по умолчанию None
:type inExtra: any, опционально
:param inForceChunkBool: True - вне зависимости от текущего режима перейти на режим сохранения по частям, по умолчанию True
:type inForceChunkBool: bool, опционально
:param inShiftSecFloat: Последние секунды, которые не записывать в промежуточный аудиофайл. Они будут началом следующего аудио отрывка, по умолчанию 0.0
:type inShiftSecFloat: float
:return: Наименование сохраненного аудиофайла
:rtype: str
"""
if inForceChunkBool==True and self.mFileAvailableChunkInt==None: self.mFileAvailableChunkInt=0
lFileNameStr = self.mFileNameStr
if self.mFileAvailableChunkInt!=None:
lFileNameStr+=f"_{self.mFileAvailableChunkInt:05}"
self.mFileAvailableChunkInt = self.mFileAvailableChunkInt + 1
lFileNameWExtStr = f"{lFileNameStr}.{self.mFileFormatStr}"
lFilePathStr = os.path.abspath(os.path.join(self.mFolderPathStr,lFileNameWExtStr))
lDataFrameBytes = b''.join(self.mRecordedFramesList)
if inShiftSecFloat != 0.0:
lFrameLenInt = int(self.mSampleSizeInt*inShiftSecFloat*self.mChannelCountInt*self.mSampleRateInt)
lFrameLenInt = int(lFrameLenInt // (self.mChannelCountInt * self.mSampleSizeInt)) * (self.mChannelCountInt * self.mSampleSizeInt)
if lFrameLenInt<len(lDataFrameBytes):
self.mRecordedFramesList=[lDataFrameBytes[-lFrameLenInt:]]
lDataFrameBytes = lDataFrameBytes[:-lFrameLenInt]
else:
lDataFrameBytes=b""
else:
self.mRecordedFramesList = []
# Advanced usage, if you have raw audio data:
sound = AudioSegment(
# raw audio data (bytes)
data=lDataFrameBytes,
# 2 byte (16 bit) samples
sample_width=self.mSampleSizeInt,
# 44.1 kHz frame rate
frame_rate=self.mSampleRateInt,
# stereo
channels=self.mChannelCountInt
)
lTimeSecFloat = time.time() - inShiftSecFloat
if not os.path.exists(os.path.abspath(self.mFolderPathStr)):
os.mkdir(self.mFolderPathStr)
sound.export(lFilePathStr, format=f"{self.mFileFormatStr}")
self.mFileNameList.append(lFileNameWExtStr)
self.mFileInfoDict[lFileNameWExtStr]= {
"StartSecFloat": self.mStartChunkSecFloat,
"EndSecFloat": lTimeSecFloat,
"Extra": inExtra,
"PathStr": lFilePathStr
}
if self.mCallbackChunkDef != None and self.mFileAvailableChunkInt!=None:
lCallbackThread = threading.Thread(target=self.mCallbackChunkDef,args=[self, lFileNameWExtStr], daemon = True)
lCallbackThread.start()
self.mStartChunkSecFloat = lTimeSecFloat
return lFileNameWExtStr
def FileInfoGet(self, inFileNameStr=None):
"""L-,W+: Вернуть информацию по аудиофайлу inFileNameStr. Если inFileNameStr == None -> Функция вернет информацию по последнему записанному файлу
.. code-block:: python
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
lRec.CaptureStart(inFileNameStr = "out", inFileFormatStr = "mp3", inDurationSecFloat = None, inChunkSecFloat = 5.0)
lRec.CaptureChunk()
lFileInfoDict = lRec.FileInfoGet()
:param inFileNameStr: Наименование аудиофайла с указанием расширения, по умолчанию None (взять последний записанный файл)
:type inFileNameStr: str, опционально
:return: {StartSecFloat:, EndSecFloat:, Extra:, PathStr:, } или None
:rtype: dict
"""
if inFileNameStr == None: inFileNameStr = self.FileLastGet()
return self.mFileInfoDict.get(inFileNameStr, None)
def FileListGet(self):
"""L-,W+: Вернуть список сохраненных аудиофайлов (наименования)
.. code-block:: python
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
lRec.CaptureStart(inFileNameStr = "out", inFileFormatStr = "mp3", inDurationSecFloat = None, inChunkSecFloat = 5.0)
lRec.CaptureChunk()
lRec.CaptureChunk()
lRec.FileListGet()
:return: ["out_00000.mp3", "out_00001.mp3", ...]
:rtype: list
"""
return self.mFileNameList
def FileLastGet(self):
"""L-,W+: Вернуть наименование последнего сохраненного аудиофайла
.. code-block:: python
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
lRec.CaptureStart(inFileNameStr = "out", inFileFormatStr = "mp3", inDurationSecFloat = None, inChunkSecFloat = 5.0)
lRec.CaptureChunk()
lRec.CaptureChunk()
lRec.FileListGet()
:return: ["out_00000.mp3", "out_00001.mp3", ...]
:rtype: list
"""
return self.mFileNameList[-1]
#def IsSilent(self, inLastSecFloat=None):
# "Returns 'True' if below the 'silent' threshold"
# self.mSilentLastCheckTimeFloat = time.time()
# if inLastSecFloat == None: inLastSecFloat = self.mChunkSilentSecFloat
# lFrameLenInt = int(self.mSampleSizeInt*inLastSecFloat)
# if lFrameLenInt<len(self.mRecordedFramesList): lData = self.mRecordedFramesList[-lFrameLenInt:]
# else: lData = self.mRecordedFramesList
# return max(lData) < self.mThresholdInt
def __TriggerCheck__(self):
"""Контроль записи / остановки аудио по следующим критериям:
- Общая длительность,
- Максимальная длительность части,
- Максимальная длит тишины (часть),
- Максимальная длительность тишины (остановка),
"""
# Проверка по длине записи (CHUNK)
if self.mChunkSecFloat != None and time.time() - self.mStartChunkSecFloat > self.mChunkSecFloat: self.CaptureChunk()
# Остановка записи по максимальной длине
if self.mDurationSecFloat != None and time.time() - self.mStartSecFloat > self.mDurationSecFloat: self.CaptureStop(inWaitStream=False)
# Проверка тишины
#if self.mChunkSilentSecFloat != None and time.time() - self.mSilentLastCheckTimeFloat and self.IsSilent(): self.mT.append("ТИШИНА!!")