Pyaudio是Python中用于音频输入和输出的库。其底层基于跨平台音频 I/O 库
PortAudio,能够实现音频的录制、播放和实时处理功能。通过 PyAudio,开发者可以轻松地在不同操作系统(如Windows、Linux、macOS)上进行音频流的创建与管理。
安装Pyaudio
在开始之前,我们需要先安装Pyaudio库。可以使用pip来安装Pyaudio:
pip install pyaudio
*若pip可正常安装请忽视此内容。如果您使用的是 Windows,则在尝试安装 Pyaudio 时可能会遇到一些错误。为了修复此问题和“Pyaudio 模块未找到错误”,您需要在命令提示符中执行以下 2 个步骤。
1. pip install pipwin
2. pipwin install pyaudio
基本概念
(一)声道(Channels):音频信号的独立路径,常见的有单声道(1 个声道)和立体声(2 个声道)。
(二)采样率(Sample Rate):每秒采集音频样本的数量,常见的采样率有 44100 Hz、48000 Hz 等。
(三)采样宽度(Sample Width):每个音频样本的位数,常见的有 8 位、16 位、32 位等。
(四)音频格式(Format):PyAudio 支持多种音频格式,如 paInt8、paInt16、paFloat32 等
1.pyaudio浏览设备中可用的音频设备
import pyaudio
p = pyaudio.PyAudio()
# 获取主机API信息
info = p.get_host_api_info_by_index(0)
# print(info)
numdevices = info.get('deviceCount')
# print(numdevices)
# 遍历所有设备
for i in range(0, numdevices):
device_info = p.get_device_info_by_host_api_device_index(0, i)
device_name = device_info.get('name')
max_input_channels = device_info.get('maxInputChannels')
max_output_channels = device_info.get('maxOutputChannels')
if max_input_channels > 0:
print(f"输入设备ID: {i}, 名称: {device_name}")
if max_output_channels > 0:
print(f"输出设备ID: {i}, 名称: {device_name}")
✅ 小结: 通过 PyAudio
实例可以枚举所有音频设备,并识别其类型(输入/输出)。这在实际应用中用于选择合适的录音或播放设备,是音频程序设备管理的必备步骤。
2.使用pyaudio录制音频
import pyaudio,wave
def record_audio(filename, duration, sample_rate=44100, channels=2, chunk=1024):
"""
filename:保存音频的文件名
duration:录制时长(秒)
sample_rate=44100:采样率,默认44.1kHz(CD音质)
channels=2:声道数,默认立体声(2声道)
chunk=1024:每次读取的音频数据块大小
"""
audio_format = pyaudio.paInt16 #使用16位整数格式存储音频数据
audio = pyaudio.PyAudio() #创建PyAudio对象来管理音频流
stream = audio.open(format=audio_format,
channels=channels,
rate=sample_rate,
input=True, #设置为输入设备(麦克风)
frames_per_buffer=chunk)
frames = [] #用于存储所有录制的音频数据块
print("开始录制音频...")
for i in range(0, int(sample_rate / chunk * duration)):
data = stream.read(chunk)
frames.append(data)
print("录制完成!")
stream.stop_stream() #停止音频流
stream.close() #关闭音频流
audio.terminate() #终止PyAudio对象,释放资源
wf = wave.open(filename, 'wb') #以二进制写入模式打开WAV文件
wf.setnchannels(channels) #设置声道数、采样宽度(字节数)、采样率
wf.setsampwidth(audio.get_sample_size(audio_format))
wf.setframerate(sample_rate)
wf.writeframes(b''.join(frames)) #将所有数据块合并并写入文件
wf.close() #关闭文件
record_audio("output.wav", duration=5)
✅ 小结: 本章实现了完整的音频录制功能,包括音频流的打开、数据读取、帧存储和 WAV
文件写入。录制过程中需合理设置参数,并注意资源的正确释放。
3.pyaudio播放音频
import pyaudio
import wave
import os
basedir = os.path.dirname(__file__)
def play_audio(filename):
audio = pyaudio.PyAudio() #创建PyAudio对象来管理音频流
wf = wave.open(filename, 'rb') #以二进制读取模式打开WAV文件
stream = audio.open(
format=audio.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True
)
"""
format:根据WAV文件的采样宽度自动确定音频格式
channels:使用WAV文件中的声道数
rate:使用WAV文件中的采样率
output=True:设置为输出设备(扬声器/耳机)
"""
chunk = 1024 #每次读取和播放的数据块大小
data = wf.readframes(chunk) #从WAV文件中读取指定数量的音频帧
print("开始播放音频...")
while data:
stream.write(data) #将音频数据写入输出流进行播放
data = wf.readframes(chunk)
print("播放完成!")
stream.stop_stream() #停止、关闭音频流 终止PyAudio对象,释放系统音频资源
stream.close()
audio.terminate()
# 使用示例
play_audio(os.path.join(basedir, "music.wav"))
✅ 小结: 通过打开 WAV
文件并读取其格式信息,我们可以配置输出流并将音频数据写入扬声器。播放过程中需注意数据块的连续读取与写入,确保流畅播放。
4.pyaudio 实时监听音频中继
import pyaudio
CHUNK = 1024 # 每次读取的音频数据块大小(采样点数)
WIDTH = 2 # 每个采样点的字节数(2字节=16位)
CHANNELS = 2 # 立体声(2个声道)
RATE = 44100 # 采样率(44.1kHz,CD音质)
RECORD_SECONDS = 10 # 录制时长
p = pyaudio.PyAudio()
stream = p.open(
format=p.get_format_from_width(WIDTH), # 音频格式(16位)
channels=CHANNELS, # 声道数
rate=RATE, # 采样率
input=True, # 启用输入(录音)
output=True, # 启用输出(播放)
frames_per_buffer=CHUNK # 缓冲区大小
)
print("* recording")
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
data = stream.read(CHUNK) # 从输入设备读取音频数据
stream.write(data, CHUNK) # 将相同数据写入输出设备
print("* done")
stream.stop_stream() # 停止音频流
stream.close() # 关闭音频流
p.terminate() # 终止PyAudio
✅ 小结: 实时监听是一种常见的音频处理模式,适用于监听、对讲等场景。通过同时打开输入和输出流,并直接传递数据,实现低延迟的音频中继
5.pyaudio 回调方法播放
"""PyAudio Example: Play a wave file (callback version)"""
import pyaudio
import wave
import time
import sys
if len(sys.argv) < 2:
print("Plays a wave file.\n\nUsage: %s output.wav" % sys.argv[0])
sys.exit(-1)
wf = wave.open(sys.argv[1], 'rb')
p = pyaudio.PyAudio()
def callback(in_data, frame_count, time_info, status):
data = wf.readframes(frame_count) # 从WAV文件读取指定帧数的数据
return data, pyaudio.paContinue
stream = p.open(
format=p.get_format_from_width(wf.getsampwidth()), # 从WAV文件获取采样宽度
channels=wf.getnchannels(), # 从WAV文件获取声道数
rate=wf.getframerate(), # 从WAV文件获取采样率
output=True, # 只输出模式
stream_callback=callback # 指定回调函数
)
stream.start_stream() # 启动音频流(开始播放)
while stream.is_active():
time.sleep(0.1) # 主线程休眠,等待播放完成
stream.stop_stream()
stream.close()
wf.close()
p.terminate()
✅ 小结: 使用回调函数可以实现更高效的音频流管理,避免主线程阻塞。回调函数在需要新数据时自动被调用,适合长时间播放或实时处理。
6.pyaudio 非阻塞回调
"""
PyAudio Example: Make a wire between input and output (i.e., record a
few samples and play them back immediately).
This is the callback (non-blocking) version.
"""
import pyaudio
import time
WIDTH = 2 # 每个样本的字节数(2字节=16位)
CHANNELS = 2 # 立体声(2个声道)
RATE = 44100 # 采样率(44.1kHz,CD音质)
# MAX_DURATION = 10 # 最大运行10秒
p = pyaudio.PyAudio()
# start_time = time.time()
def callback(in_data, frame_count, time_info, status):
# if time.time() - start_time > MAX_DURATION:
# return in_data, pyaudio.paComplete
return in_data, pyaudio.paContinue
stream = p.open(
format=p.get_format_from_width(WIDTH),
channels=CHANNELS,
rate=RATE,
input=True, # 启用输入(录音)
output=True, # 启用输出(播放)
stream_callback=callback # 使用回调模式
)
stream.start_stream() # 启动音频流
print('Stream is active')
while stream.is_active():
time.sleep(0.1) # 主线程休眠,让回调函数处理音频
print('Stream close')
stream.stop_stream()
stream.close()
p.terminate()
✅ 小结:
非阻塞回调模式适用于需要长时间运行或需与其他程序交互的音频应用。通过回调函数返回 paContinue 或 paComplete 控制流的状态。
7.使用pyaudio和pyqt6制作一个录音机
import sys
import os
from PyQt6.QtCore import QThread, pyqtSignal, Qt
from PyQt6.QtWidgets import *
class MainWindow(QWidget):
def __init__(self):
super().__init__()
self.name = None
main_jm = QVBoxLayout()
bu = QVBoxLayout()
bu.addWidget(self.button())
out = QVBoxLayout()
out.addWidget(self.out_put())
inp = QVBoxLayout()
inp.addWidget(self.in_put())
main_jm.addLayout(bu)
main_jm.addLayout(out)
main_jm.addLayout(inp)
self.setLayout(main_jm)
self.setWindowTitle("录音机项目")
self.resize(800, 600)
def button(self):
group = QGroupBox("按钮")
group.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout = QHBoxLayout()
self.b1 = QPushButton("开启录音")
self.b1.setMinimumHeight(100)
self.b1.setMaximumWidth(100)
self.b1.clicked.connect(self.button_1)
self.b1.setEnabled(True)
self.b2 = QPushButton("播放音频")
self.b2.setMinimumHeight(100)
self.b2.setMaximumWidth(100)
self.b2.clicked.connect(self.button_2)
self.b2.setEnabled(True)
self.b3 = QPushButton("中断操作")
self.b3.setMinimumHeight(100)
self.b3.setMaximumWidth(100)
self.b3.clicked.connect(self.stop_operation)
self.b3.setEnabled(False)
layout.addWidget(self.b1)
layout.addWidget(self.b2)
layout.addWidget(self.b3)
group.setLayout(layout)
return group
def in_put(self):
group = QGroupBox("input")
group.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout = QHBoxLayout()
self.in_1 = QLabel("输入需要保存/播放的文件名(无需添加后缀):")
layout.addWidget(self.in_1)
self.in_2 = QLineEdit()
self.in_2.setReadOnly(False)
self.in_2.setText("output")
layout.addWidget(self.in_2)
self.in_2_1 = QLabel(".wav")
layout.addWidget(self.in_2_1)
self.in_3 = QPushButton("确认")
self.in_3.clicked.connect(self.button_in)
layout.addWidget(self.in_3)
group.setLayout(layout)
return group
def button_in(self):
text = self.in_2.text()
self.out_1.append(f"选择的文件内容已改为:{text}.wav")
self.name = (text+".wav")
pass
def out_put(self):
group = QGroupBox("Output")
group.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout = QVBoxLayout()
self.out_1 = QTextEdit()
self.out_1.setReadOnly(True)
self.out_1.setPlaceholderText("此处将显示输出内容...")
layout.addWidget(self.out_1)
self.out_2 = QLabel("大家好,我是练习时长两年半的开发者。若录音或播放时出现异常,请在Windows设置中切换默认使用设备")
self.out_2.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout.addWidget(self.out_2)
group.setLayout(layout)
return group
def button_1(self):
self.b1.setDisabled(True)
self.b1.setText("录音运行中")
self.b3.setEnabled(True) # 启用中断按钮
self.luyin = audio_luyin(self.name)
self.luyin.mess_sign.connect(self.update_output)
self.luyin.finished.connect(self.on_recording_finished)
self.luyin.start()
def on_recording_finished(self):
"""录音完成后的回调函数"""
self.enable_button(1)
# 录音完成后自动播放
self.b2.setDisabled(True)
self.b2.setText("播放运行中")
self.b3.setEnabled(True) # 启用中断按钮
self.bofang = audio_bofang("music.wav") # 传入录音文件路径
self.bofang.mess_sign.connect(self.update_output)
self.bofang.finished.connect(lambda: self.enable_button(2))
self.bofang.start()
def button_2(self):
self.b2.setDisabled(True)
self.b2.setText("播放运行中")
self.b3.setEnabled(True) # 启用中断按钮
self.bofang = audio_bofang(self.name)
self.bofang.mess_sign.connect(self.update_output)
self.bofang.finished.connect(lambda :(self.enable_button(2)))
self.bofang.start()
def stop_operation(self):
"""中断正在运行的操作"""
if hasattr(self, 'luyin') and self.luyin.isRunning():
self.luyin.stop_recording = True # 设置停止标志
self.out_1.append("正在中断录音...")
self.out_1.append("你干嘛,诶哟")
if hasattr(self, 'bofang') and self.bofang.isRunning():
self.bofang.stop_playing = True # 设置停止标志
self.out_1.append("正在中断播放...")
def enable_button(self,text):
self.b3.setEnabled(False)
if text == 1:
self.b1.setEnabled(True)
self.b1.setText("开启录音")
if text == 2:
self.b2.setEnabled(True)
self.b2.setText("播放音频")
def update_output(self,message):
self.out_1.append(message)
class audio_bofang(QThread):
finished = pyqtSignal()
mess_sign = pyqtSignal(str)
def __init__(self, audio_file=None):
super().__init__()
self.stop_playing = False
self.audio_file = audio_file
def set_audio_file(self, audio_file):
"""设置音频文件路径"""
self.audio_file = audio_file
def run(self):
import pyaudio
import wave
import os
# 如果没有设置音频文件,使用默认路径
if self.audio_file is None:
basedir = os.path.dirname(__file__)
self.audio_file = os.path.join(basedir, "output.wav")
# 检查文件是否存在
if not os.path.exists(self.audio_file):
error_msg = f"音频文件不存在: {self.audio_file}"
print(error_msg)
self.mess_sign.emit(error_msg)
self.finished.emit()
return
def play_audio(filename):
audio = pyaudio.PyAudio() # 创建PyAudio对象来管理音频流
wf = wave.open(filename, 'rb') # 以二进制读取模式打开WAV文件
stream = audio.open(
format=audio.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True
)
chunk = 1024 # 每次读取和播放的数据块大小
data = wf.readframes(chunk) # 从WAV文件中读取指定数量的音频帧
print(f"开始播放音频: {os.path.basename(filename)}")
self.mess_sign.emit(f"开始播放音频: {os.path.basename(filename)}")
while data and not self.stop_playing:
stream.write(data) # 将音频数据写入输出流进行播放
data = wf.readframes(chunk)
if self.stop_playing:
print("播放被中断")
self.mess_sign.emit("播放被中断")
else:
print("播放完成!")
self.mess_sign.emit("播放完成!")
stream.stop_stream() # 停止、关闭音频流 终止PyAudio对象,释放系统音频资源
stream.close()
audio.terminate()
play_audio(self.audio_file)
self.finished.emit()
class audio_luyin(QThread):
finished = pyqtSignal()
mess_sign = pyqtSignal(str)
def __init__(self, filename=None, duration=5, sample_rate=44100, channels=2, chunk=1024):
super().__init__()
self.stop_recording = False
self.filename = filename
self.duration = duration
self.sample_rate = sample_rate
self.channels = channels
self.chunk = chunk
if self.filename is None:
basedir = os.path.dirname(__file__)
self.filename = os.path.join(basedir, "output.wav")
# 检查文件是否存在
if not os.path.exists(self.filename):
error_msg = f"音频文件不存在: {self.filename}"
print(error_msg)
self.mess_sign.emit(error_msg)
self.finished.emit()
return
def run(self):
import pyaudio, wave
def record_audio():
audio_format = pyaudio.paInt16 # 使用16位整数格式存储音频数据
audio = pyaudio.PyAudio() # 创建PyAudio对象来管理音频流
stream = audio.open(format=audio_format,
channels=self.channels,
rate=self.sample_rate,
input=True, # 设置为输入设备(麦克风)
frames_per_buffer=self.chunk)
frames = [] # 用于存储所有录制的音频数据块
print("开始录制音频...")
self.mess_sign.emit("开始录制音频...")
self.mess_sign.emit(f"录制音频时长:{self.duration}")
total_chunks = int(self.sample_rate / self.chunk * self.duration)
for i in range(total_chunks):
if self.stop_recording: # 检查停止标志
break
data = stream.read(self.chunk)
frames.append(data)
if self.stop_recording:
self.mess_sign.emit("录音已中断")
else:
self.mess_sign.emit("录制完成!")
stream.stop_stream() # 停止音频流
stream.close() # 关闭音频流
audio.terminate() # 终止PyAudio对象,释放资源
wf = wave.open(self.filename, 'wb') # 以二进制写入模式打开WAV文件
wf.setnchannels(self.channels) # 设置声道数、采样宽度(字节数)、采样率
wf.setsampwidth(audio.get_sample_size(audio_format))
wf.setframerate(self.sample_rate)
wf.writeframes(b''.join(frames)) # 将所有数据块合并并写入文件
wf.close() # 关闭文件
record_audio()
self.finished.emit()
if __name__ == '__main__':
app = QApplication(sys.argv)
window = MainWindow()
window.show()
app.exec()
✅ 小结:
本章综合运用 PyAudio 和 PyQt6,实现了一个具备录音、播放、中断功能的图形化录音机。通过多线程避免界面卡顿,并通过信号机制实现界面与逻辑的分离,展示了音频编程在实际项目中的应用。
comment 评论区
star_outline 咱快来抢个沙发吧!