You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

364 lines
17 KiB

import pyaudio
from pydub import AudioSegment
import threading
import wave
import time
from pyOpenRPA.Utils import Text
import os
def DeviceMicrophoneIndex():
"""L-,W+: Выполнить поиск устройства, с помощью которого можно будет выполнить захват c микрофона.
p = pyaudio.PyAudio()
lDeviceInfoDict = p.get_default_input_device_info()
lDefaultIndexInt = lDeviceInfoDict["index"]
return lDefaultIndexInt
def DeviceSystemSoundIndex():
"""L-,W+: Выполнить поиск устройства, с помощью которого можно будет выполнить захват аудио, которое поступает из приложений. Например: аудиоконференции Zoom, whatsapp, telegram и т.д.
p = pyaudio.PyAudio()
inInputBool = True
inIsLoopbackBool = True
if inInputBool == True:
lDeviceInfoDict = p.get_default_output_device_info()
lDefaultIndexInt = lDeviceInfoDict["index"]
lDefaultNameStr = lDeviceInfoDict["name"]
lCatchIndexInt = None
lCatchDiffRatioFloat = 0.0
for lItemDict in DeviceListGet():
lCompareBool = False
if lItemDict["MaxOutputChannelsInt"]>0:
if inIsLoopbackBool==True and lItemDict["HostApiStr"]=="Windows WASAPI": lCompareBool = True
elif inIsLoopbackBool==False: lCompareBool = True
if lCompareBool == True:
lDiffRationFloat = Text.SimilarityNoCase(in1Str=lDefaultNameStr, in2Str=lItemDict["NameStr"])
if lDiffRationFloat> lCatchDiffRatioFloat:
lCatchDiffRatioFloat = lDiffRationFloat
lDeviceInfoDict = p.get_default_output_device_info()
lDefaultIndexInt = lDeviceInfoDict["index"]
lDefaultNameStr = lDeviceInfoDict["name"]
lCatchIndexInt = None
lCatchDiffRatioFloat = 0.0
for lItemDict in DeviceListGet():
lCompareBool = False
if lItemDict["MaxInputChannelsInt"]>0:
if inIsLoopbackBool==True and lItemDict["HostApiStr"]=="Windows WASAPI": lCompareBool = True
elif inIsLoopbackBool==False: lCompareBool = True
if lCompareBool == True:
lDiffRationFloat = Text.SimilarityNoCase(in1Str=lDefaultNameStr, in2Str=lItemDict["NameStr"])
if lDiffRationFloat> lCatchDiffRatioFloat: lCatchIndexInt=lItemDict["IndexInt"]
return lCatchIndexInt
def DeviceListGet():
"""L-,W+: Вернуть список аудио устройст (входящих и исходящих, микрофонов и динамиков).
from pyOpenRPA.Robot import Audio
:return: [{"IndexInt":1, "NameStr": "",
"HostApiInt": 0, "HostApiStr": "MME"|"Windows WASAPI"|"Windows WDM-KS",
"MaxInputChannelsInt": 0, "MaxOutputChannelsInt": 0,
"DefaultSampleRateFloat": 44100.0
:rtype: list
l_result = []
p = pyaudio.PyAudio()
for i in range(0, p.get_device_count()):
l_info = p.get_device_info_by_index(i)
l_info_dict = {
"NameStr": l_info["name"],
"MaxInputChannelsInt": l_info["maxInputChannels"],
"MaxOutputChannelsInt": l_info["maxOutputChannels"],
"HostApiInt": l_info["hostApi"],
"DefaultSampleRateFloat": l_info["defaultSampleRate"],
"HostApiStr": p.get_host_api_info_by_index(l_info["hostApi"])["name"] #"MME"|"Windows WASAPI"|"Windows WDM-KS"
return l_result
class Recorder:
mStatusStr = "0_READY" # "0_READY", "1_RECORDING"
mAudio = None
mCaptureThread = None
mStream = None
mDeviceInt = None
mChannelCountInt = None
mFramesInt = 512
mRecordedFramesList = []
mSampleRateInt = None
mSampleSizeInt = None
mCaptureBool = True
mFolderPathStr = None
mFileNameStr = None
mFileFormatStr = None
mFileAvailableChunkInt = None
mFileInfoDict=None # {"file.mp3":{StartSecFloat:, EndSecFloat:, Extra:,PathStr, }}
mChunkSecFloat = None
mStartSecFloat = None
mStartChunkSecFloat = None
mDurationSecFloat = None
mThresholdInt = 500
mSilentLastCheckTimeFloat = None
def __init__(self, inDeviceInt=None):
"""L-,W+: Инициализация экземпляра класса записи звука
.. code-block:: python
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
:param inDeviceInt: , по умолчанию None (использование устройства, полученного от DeviceSystemSoundIndex())
:type inDeviceInt: int, опционально
self.mDeviceInt = inDeviceInt
if inDeviceInt == None: inDeviceInt = DeviceSystemSoundIndex()
self.mDeviceInt = inDeviceInt
if DeviceListGet()[inDeviceInt]["MaxInputChannelsInt"]>0: self.mIsMicrophoneBool = True
else: self.mIsMicrophoneBool = False
def StatusGet(self):
"""L-,W+: Вернуть статус записи звука
.. code-block:: python
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
:return: "0_READY" или "1_RECORDING"
:rtype: str
return self.mStatusStr
def CaptureStart(self, inFolderPathStr="",inFileNameStr = "out", inFileFormatStr = "mp3", inDurationSecFloat = None, inChunkSecFloat = 300.0):
"""L-,W+: Начать запись звука
.. code-block:: python
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
lRec.CaptureStart(inFileNameStr = "out", inFileFormatStr = "mp3", inDurationSecFloat = None, inChunkSecFloat = 5.0, inChunkSilentSecFloat = 10.0)
:param inFolderPathStr: Путь к папке, в которую сохранять аудиофайлы захвата , по умолчанию ""
:type inFolderPathStr: str, опционально
:param inFileNameStr: Наименование файла без расширения, по умолчанию "out"
:type inFileNameStr: str, опционально
:param inFileFormatStr: Наименование формата, в который будет происходить сохранение ("mp3" или "wav" или "raw" или "aif"), по умолчанию "mp3"
:type inFileFormatStr: str, опционально
:param inDurationSecFloat: Длительность захвата аудио, по умолчанию None (пока не поступит команда CaptureStop() )
:type inDurationSecFloat: float, опционально
:param inChunkSecFloat: Максимальная длина части аудиофайла, по умолчанию 300.0
:type inChunkSecFloat: float, опционально
self.mFileInfoDict = {}
self.mStatusStr = "1_RECORDING"
if inChunkSecFloat != None: self.mFileAvailableChunkInt = 0
self.mDurationSecFloat = inDurationSecFloat
self.mChunkSecFloat = inChunkSecFloat
self.mFolderPathStr = inFolderPathStr
self.mFileNameStr = inFileNameStr
self.mFileFormatStr = inFileFormatStr
self.mAudio = pyaudio.PyAudio()
self.mSampleSizeInt = self.mAudio.get_sample_size(pyaudio.paInt16)
lDeviceInfoDict = self.mAudio.get_device_info_by_index(self.mDeviceInt)
#Open stream
self.mSampleRateInt = int(lDeviceInfoDict["defaultSampleRate"])
self.mChannelCountInt = lDeviceInfoDict["maxInputChannels"] if (lDeviceInfoDict["maxOutputChannels"] < lDeviceInfoDict["maxInputChannels"]) else lDeviceInfoDict["maxOutputChannels"]
self.mStream = = pyaudio.paInt16,
channels = self.mChannelCountInt,
rate = self.mSampleRateInt,
input = True,
frames_per_buffer = self.mFramesInt,
input_device_index = lDeviceInfoDict["index"],
as_loopback = not self.mIsMicrophoneBool)
self.mCaptureThread = threading.Thread(target=self.__Capture__)
self.mStartSecFloat = time.time()
self.mStartChunkSecFloat = self.mStartSecFloat
def __Capture__(self):
while self.mCaptureBool==True:
def CaptureWait(self):
"""L-,W+: Ожидать окончания захвата аудио
.. code-block:: python
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
lRec.CaptureStart(inFileNameStr = "out", inFileFormatStr = "mp3", inDurationSecFloat = 10.0, inChunkSecFloat = 5.0, inChunkSilentSecFloat = 10.0)
def CaptureStop(self, inWaitStream=True, inExtra=None):
"""L-,W+: Остановить захват аудио
.. code-block:: python
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
lRec.CaptureStart(inFileNameStr = "out", inFileFormatStr = "mp3", inDurationSecFloat = None, inChunkSecFloat = 5.0, inChunkSilentSecFloat = 10.0)
:param inWaitStream: True - выполнить ожидание окончания потока захвата перед окончанием, по умолчанию True
:type inWaitStream: bool, опционально
:param inExtra: Дополнительный контент, необходимый для идентификации файла. В дальнейшем получить структуру можно с помощью функции FileInfoGet()['Extra'], по умолчанию None
:type inExtra: any, опционально
if inWaitStream == True: self.mCaptureThread.join()
#Close module
self.CaptureChunk(inExtra=inExtra, inForceChunkBool=False)
self.mStatusStr = "0_READY"
def CaptureChunk(self, inExtra=None, inForceChunkBool=True):
"""L-,W+: Зафиксировать захват аудио в виде промежуточного файла вида: <имя файла>_00000.mp3
.. code-block:: python
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
lRec.CaptureStart(inFileNameStr = "out", inFileFormatStr = "mp3", inDurationSecFloat = None, inChunkSecFloat = 5.0, inChunkSilentSecFloat = 10.0)
:param inExtra: Дополнительный контент, необходимый для идентификации файла. В дальнейшем получить структуру можно с помощью функции FileInfoGet()['Extra'], по умолчанию None
:type inExtra: any, опционально
:param inForceChunkBool: True - вне зависимости от текущего режима перейти на режим сохранения по частям, по умолчанию True
:type inForceChunkBool: bool, опционально
if inForceChunkBool==True and self.mFileAvailableChunkInt==None: self.mFileAvailableChunkInt=0
lFileNameStr = self.mFileNameStr
if self.mFileAvailableChunkInt!=None:
self.mFileAvailableChunkInt = self.mFileAvailableChunkInt + 1
lFilePathStr = os.path.abspath(os.path.join(self.mFolderPathStr,f"{lFileNameStr}.{self.mFileFormatStr}"))
# Advanced usage, if you have raw audio data:
sound = AudioSegment(
# raw audio data (bytes)
# 2 byte (16 bit) samples
# 44.1 kHz frame rate
# stereo
self.mRecordedFramesList = []
lTimeSecFloat = time.time()
sound.export(lFilePathStr, format=f"{self.mFileFormatStr}")
self.mFileInfoDict[f"{lFileNameStr}.{self.mFileFormatStr}"]= {
"StartSecFloat": self.mStartChunkSecFloat,
"EndSecFloat": lTimeSecFloat,
"Extra": inExtra,
"PathStr": lFilePathStr
self.mStartChunkSecFloat = lTimeSecFloat
def FileInfoGet(self, inFileNameStr=None):
"""L-,W+: Вернуть информацию по аудиофайлу inFileNameStr. Если inFileNameStr == None -> Функция вернет информацию по последнему записанному файлу
.. code-block:: python
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
lRec.CaptureStart(inFileNameStr = "out", inFileFormatStr = "mp3", inDurationSecFloat = None, inChunkSecFloat = 5.0, inChunkSilentSecFloat = 10.0)
lFileInfoDict = lRec.FileInfoGet()
:param inFileNameStr: Наименование аудиофайла с указанием расширения, по умолчанию None (взять последний записанный файл)
:type inFileNameStr: str, опционально
:return: {StartSecFloat:, EndSecFloat:, Extra:, PathStr:, } или None
:rtype: dict
if inFileNameStr == None: inFileNameStr = self.FileLastGet()
return self.mFileInfoDict.get(inFileNameStr, None)
def FileListGet(self):
"""L-,W+: Вернуть список сохраненных аудиофайлов (наименования)
.. code-block:: python
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
lRec.CaptureStart(inFileNameStr = "out", inFileFormatStr = "mp3", inDurationSecFloat = None, inChunkSecFloat = 5.0, inChunkSilentSecFloat = 10.0)
:return: ["out_00000.mp3", "out_00001.mp3", ...]
:rtype: list
return self.mFileNameList
def FileLastGet(self):
"""L-,W+: Вернуть наименование последнего сохраненного аудиофайла
.. code-block:: python
from pyOpenRPA.Robot import Audio
lRec = Audio.Recorder()
lRec.CaptureStart(inFileNameStr = "out", inFileFormatStr = "mp3", inDurationSecFloat = None, inChunkSecFloat = 5.0, inChunkSilentSecFloat = 10.0)
:return: ["out_00000.mp3", "out_00001.mp3", ...]
:rtype: list
return self.mFileNameList[-1]
def __Callback__(self, inDefList):
def __CallbackIsChunked__(self):
def __CallbackIsStopped__(self):
#def IsSilent(self, inLastSecFloat=None):
# "Returns 'True' if below the 'silent' threshold"
# self.mSilentLastCheckTimeFloat = time.time()
# if inLastSecFloat == None: inLastSecFloat = self.mChunkSilentSecFloat
# lFrameLenInt = int(self.mSampleSizeInt*inLastSecFloat)
# if lFrameLenInt<len(self.mRecordedFramesList): lData = self.mRecordedFramesList[-lFrameLenInt:]
# else: lData = self.mRecordedFramesList
# return max(lData) < self.mThresholdInt
def __TriggerCheck__(self):
"""Контроль записи / остановки аудио по следующим критериям:
- Общая длительность,
- Максимальная длительность части,
- Максимальная длит тишины (часть),
- Максимальная длительность тишины (остановка),
# Проверка по длине записи (CHUNK)
if self.mChunkSecFloat != None and time.time() - self.mStartChunkSecFloat > self.mChunkSecFloat: self.CaptureChunk()
# Остановка записи по максимальной длине
if self.mDurationSecFloat != None and time.time() - self.mStartSecFloat > self.mDurationSecFloat: self.CaptureStop(inWaitStream=False)
# Проверка тишины
#if self.mChunkSilentSecFloat != None and time.time() - self.mSilentLastCheckTimeFloat and self.IsSilent(): self.mT.append("ТИШИНА!!")