import pyaudio from pydub import AudioSegment import threading import wave def DeviceSystemSoundSearchIndex(): """L-,W+: Выполнить поиск устройства, с помощью которого можно будет выполнить захват аудио, которое поступает из приложений. Например: аудиоконференции Zoom, whatsapp, telegram и т.д. """ pass def DeviceListGet(): """L-,W+: Вернуть список аудио устройст (входящих и исходящих, микрофонов и динамиков). from pyOpenRPA.Robot import Audio Audio.DeviceListGet() :return: [{"IndexInt":1, "NameStr": "", "HostApiInt": 0, "HostApiStr": "MME"|"Windows WASAPI"|"Windows WDM-KS", "MaxInputChannelsInt": 0, "MaxOutputChannelsInt": 0, "DefaultSampleRateFloat": 44100.0 },...] :rtype: list """ l_result = [] p = pyaudio.PyAudio() for i in range(0, p.get_device_count()): l_info = p.get_device_info_by_index(i) l_info_dict = { "IndexInt":l_info["index"], "NameStr": l_info["name"], "MaxInputChannelsInt": l_info["maxInputChannels"], "MaxOutputChannelsInt": l_info["maxOutputChannels"], "HostApiInt": l_info["hostApi"], "DefaultSampleRateFloat": l_info["defaultSampleRate"], "HostApiStr": p.get_host_api_info_by_index(l_info["hostApi"])["name"] #"MME"|"Windows WASAPI"|"Windows WDM-KS" } l_result.append(l_info_dict) return l_result class Recorder: mStatusStr = "0_READY" mAudio = pyaudio.PyAudio() mCaptureThread = None mStream = None mDeviceInt = None mChannelCountInt = None mFramesInt = 512 mRecordedFramesList = [] mUseLoopbackBool = True mSampleRateInt = None mSampleSizeInt = mAudio.get_sample_size(pyaudio.paInt16) mCaptureBool = True mFileNameStr = "aux" mFileFormatStr = "mp3" def __init__(self, inDeviceInt=None): self.mDeviceInt = inDeviceInt def CaptureStart(self): lDeviceInfoDict = self.mAudio.get_device_info_by_index(self.mDeviceInt) #Open stream self.mSampleRateInt = int(lDeviceInfoDict["defaultSampleRate"]) self.mChannelCountInt = lDeviceInfoDict["maxInputChannels"] if (lDeviceInfoDict["maxOutputChannels"] < lDeviceInfoDict["maxInputChannels"]) else lDeviceInfoDict["maxOutputChannels"] self.mStream = self.mAudio.open(format = pyaudio.paInt16, channels = self.mChannelCountInt, rate = self.mSampleRateInt, input = True, frames_per_buffer = self.mFramesInt, input_device_index = lDeviceInfoDict["index"], as_loopback = self.mUseLoopbackBool) self.mCaptureThread = threading.Thread(target=self.__Capture__) self.mCaptureThread.start() def __Capture__(self): while self.mCaptureBool == True: self.mRecordedFramesList.append(self.mStream.read(self.mFramesInt)) self.mStream.stop_stream() self.mStream.close() #Close module self.mAudio.terminate() print("done") def CaptureStop(self): self.mCaptureBool=False self.mCaptureThread.join() print("done2") self.CaptureChunk() print("done3") def CaptureChunk(self): print("CaptureChunk 1") waveFile = wave.open(f"{self.mFileNameStr}.{self.mFileFormatStr}", 'wb') waveFile.setnchannels(self.mChannelCountInt) waveFile.setsampwidth(self.mSampleSizeInt) waveFile.setframerate(self.mSampleRateInt) waveFile.writeframes(b''.join(self.mRecordedFramesList)) waveFile.close() lSound = AudioSegment( # raw audio data (bytes) data=b''.join(self.mRecordedFramesList), # 2 byte (16 bit) samples sample_width=self.mSampleSizeInt, # 44.1 kHz frame rate frame_rate=self.mSampleRateInt, # stereo channels=self.mChannelCountInt ) print("CaptureChunk 2") print(len(self.mRecordedFramesList)) lSound.export(f"{self.mFileNameStr}.{self.mFileFormatStr}", format=self.mFileFormatStr) print("CaptureChunk 3") self.mRecordedFramesList = [] def FileListGet(self): pass def FileLastGet(self): pass def __Callback__(self, inDefList): pass def __CallbackIsSilent__(self): pass def __CallbackIsChunked__(self): pass def __CallbackIsStopped__(self): pass def __TriggerCenter__(self): """L-,W+: Контроль записи / остановки аудио по следующим критериям: - Общая длительность, - Максимальная длительность части, - Максимальная длит тишины (часть), - Максимальная длительность тишины (остановка), """ pass