You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ORPA-pyOpenRPA/Sources/pyOpenRPA/Robot/Audio.py

176 lines
7.0 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import pyaudio
from pydub import AudioSegment
import threading
import wave
import time
from pyOpenRPA.Utils import Text
def DeviceMicrophoneIndex():
"""L-,W+: Выполнить поиск устройства, с помощью которого можно будет выполнить захват c микрофона.
"""
p = pyaudio.PyAudio()
lDeviceInfoDict = p.get_default_input_device_info()
lDefaultIndexInt = lDeviceInfoDict["index"]
return lDefaultIndexInt
def DeviceSystemSoundIndex():
"""L-,W+: Выполнить поиск устройства, с помощью которого можно будет выполнить захват аудио, которое поступает из приложений. Например: аудиоконференции Zoom, whatsapp, telegram и т.д.
"""
p = pyaudio.PyAudio()
inInputBool = True
inIsLoopbackBool = True
if inInputBool == True:
lDeviceInfoDict = p.get_default_output_device_info()
lDefaultIndexInt = lDeviceInfoDict["index"]
lDefaultNameStr = lDeviceInfoDict["name"]
lCatchIndexInt = None
lCatchDiffRatioFloat = 0.0
for lItemDict in DeviceListGet():
lCompareBool = False
if lItemDict["MaxOutputChannelsInt"]>0:
if inIsLoopbackBool==True and lItemDict["HostApiStr"]=="Windows WASAPI": lCompareBool = True
elif inIsLoopbackBool==False: lCompareBool = True
if lCompareBool == True:
lDiffRationFloat = Text.SimilarityNoCase(in1Str=lDefaultNameStr, in2Str=lItemDict["NameStr"])
if lDiffRationFloat> lCatchDiffRatioFloat: lCatchIndexInt=lItemDict["IndexInt"]
else:
lDeviceInfoDict = p.get_default_output_device_info()
lDefaultIndexInt = lDeviceInfoDict["index"]
lDefaultNameStr = lDeviceInfoDict["name"]
lCatchIndexInt = None
lCatchDiffRatioFloat = 0.0
for lItemDict in DeviceListGet():
lCompareBool = False
if lItemDict["MaxInputChannelsInt"]>0:
if inIsLoopbackBool==True and lItemDict["HostApiStr"]=="Windows WASAPI": lCompareBool = True
elif inIsLoopbackBool==False: lCompareBool = True
if lCompareBool == True:
lDiffRationFloat = Text.SimilarityNoCase(in1Str=lDefaultNameStr, in2Str=lItemDict["NameStr"])
if lDiffRationFloat> lCatchDiffRatioFloat: lCatchIndexInt=lItemDict["IndexInt"]
return lCatchIndexInt
def DeviceListGet():
"""L-,W+: Вернуть список аудио устройст (входящих и исходящих, микрофонов и динамиков).
from pyOpenRPA.Robot import Audio
Audio.DeviceListGet()
:return: [{"IndexInt":1, "NameStr": "",
"HostApiInt": 0, "HostApiStr": "MME"|"Windows WASAPI"|"Windows WDM-KS",
"MaxInputChannelsInt": 0, "MaxOutputChannelsInt": 0,
"DefaultSampleRateFloat": 44100.0
},...]
:rtype: list
"""
l_result = []
p = pyaudio.PyAudio()
for i in range(0, p.get_device_count()):
l_info = p.get_device_info_by_index(i)
l_info_dict = {
"IndexInt":l_info["index"],
"NameStr": l_info["name"],
"MaxInputChannelsInt": l_info["maxInputChannels"],
"MaxOutputChannelsInt": l_info["maxOutputChannels"],
"HostApiInt": l_info["hostApi"],
"DefaultSampleRateFloat": l_info["defaultSampleRate"],
"HostApiStr": p.get_host_api_info_by_index(l_info["hostApi"])["name"] #"MME"|"Windows WASAPI"|"Windows WDM-KS"
}
l_result.append(l_info_dict)
return l_result
class Recorder:
mStatusStr = "0_READY"
mAudio = None
mCaptureThread = None
mStream = None
mDeviceInt = None
mChannelCountInt = None
mFramesInt = 512
mRecordedFramesList = []
mUseLoopbackBool = True
mSampleRateInt = None
mSampleSizeInt = None
mCaptureBool = True
mFilePathStr = "out"
mFileFormatStr = "mp3"
def __init__(self, inDeviceInt=None):
self.mDeviceInt = inDeviceInt
if inDeviceInt == None: self.mDeviceInt = DeviceSystemSoundIndex()
def CaptureStart(self, inFilePathStr = "out", inFileFormatStr = "mp3", inDoChunkBool = False):
self.mFilePathStr = inFilePathStr
self.mFileFormatStr = inFileFormatStr
self.mAudio = pyaudio.PyAudio()
self.mSampleSizeInt = self.mAudio.get_sample_size(pyaudio.paInt16)
lDeviceInfoDict = self.mAudio.get_device_info_by_index(self.mDeviceInt)
#Open stream
self.mSampleRateInt = int(lDeviceInfoDict["defaultSampleRate"])
self.mChannelCountInt = lDeviceInfoDict["maxInputChannels"] if (lDeviceInfoDict["maxOutputChannels"] < lDeviceInfoDict["maxInputChannels"]) else lDeviceInfoDict["maxOutputChannels"]
self.mStream = self.mAudio.open(format = pyaudio.paInt16,
channels = self.mChannelCountInt,
rate = self.mSampleRateInt,
input = True,
frames_per_buffer = self.mFramesInt,
input_device_index = lDeviceInfoDict["index"],
as_loopback = self.mUseLoopbackBool)
self.mCaptureThread = threading.Thread(target=self.__Capture__)
self.mCaptureThread.start()
def __Capture__(self):
while self.mCaptureBool==True:
self.mRecordedFramesList.append(self.mStream.read(self.mFramesInt))
self.mStream.stop_stream()
self.mStream.close()
#Close module
self.mAudio.terminate()
def CaptureStop(self):
self.mCaptureBool=False
self.mCaptureThread.join()
self.CaptureChunk()
def CaptureChunk(self):
# Advanced usage, if you have raw audio data:
sound = AudioSegment(
# raw audio data (bytes)
data=b''.join(self.mRecordedFramesList),
# 2 byte (16 bit) samples
sample_width=self.mSampleSizeInt,
# 44.1 kHz frame rate
frame_rate=self.mSampleRateInt,
# stereo
channels=self.mChannelCountInt
)
sound.export(f"{self.mFilePathStr}.{self.mFileFormatStr}", format=f"{self.mFileFormatStr}")
self.mRecordedFramesList = []
def FileListGet(self):
pass
def FileLastGet(self):
pass
def __Callback__(self, inDefList):
pass
def __CallbackIsSilent__(self):
pass
def __CallbackIsChunked__(self):
pass
def __CallbackIsStopped__(self):
pass
def __TriggerCenter__(self):
"""L-,W+: Контроль записи / остановки аудио по следующим критериям:
- Общая длительность,
- Максимальная длительность части,
- Максимальная длит тишины (часть),
- Максимальная длительность тишины (остановка),
"""
pass