I'm building a Python service to process audio files (MP3/MP4) by splitting them into chunks and transcribing them asynchronously using a REST API. While the code works, I'm getting numerous ResourceWarnings
about unclosed files:
CopyResourceWarning: unclosed file <_io.BufferedRandom name='/var/folders/vw/k_0mpg690rg44m0mw3vj9l2c0000gp/T/tmpoico59vl.mp3'>
handle = None
ResourceWarning: Enable tracemalloc to get the object allocation traceback
My code is processing multiple audio chunks in parallel, and despite explicitly closing and unlinking temporary files, I still see these warnings appearing in my logs. Here's the relevant code:
import logging
import aiohttp
import asyncio
import json
import os
import tempfile
import sys
import subprocess
from pydub import AudioSegment
logger = logging.getLogger(__name__)
MODEL_ENDPOINT = (
"http://example"
)
MODEL_NAME = "example"
def load_audio_file(file_path):
"""Load audio file from disk."""
logger.info(f"Loading audio file from {file_path}")
file_extension = os.path.splitext(file_path)[1][1:].lower()
if file_extension == 'mp3':
audio = AudioSegment.from_mp3(file_path)
elif file_extension == 'mp4':
temp_mp3 = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3')
temp_mp3.close()
try:
cmd = [
'ffmpeg', '-i', file_path,
'-vn',
'-ar', '44100',
'-ac', '2',
'-b:a', '192k',
'-f', 'mp3',
'-y',
temp_mp3.name
]
logger.info(f"MP4 file detected, converting to mp3")
subprocess.run(
cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
audio = AudioSegment.from_mp3(temp_mp3.name)
finally:
if os.path.exists(temp_mp3.name):
os.unlink(temp_mp3.name)
else:
raise Exception("Only mp3 and mp4 files supported")
logger.info(f"Loaded audio file: {len(audio)}ms duration")
return audio
def split_audio(audio, chunk_duration_ms=29000):
chunks = []
total_duration_ms = len(audio)
overlap_ms = 500
for start_ms in range(0, total_duration_ms, chunk_duration_ms - overlap_ms):
end_ms = min(start_ms + chunk_duration_ms, total_duration_ms)
chunk = audio[start_ms:end_ms]
chunks.append(chunk)
logger.info(f"Split audio into {len(chunks)} chunks")
return chunks
async def export_audio_chunk(chunk, chunk_index):
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3')
temp_file.close()
# run the export in a thread to avoid blocking the event loop
await asyncio.to_thread(chunk.export, temp_file.name, format="mp3")
def read_file(filename):
with open(filename, 'rb') as f:
return f.read()
audio_data = await asyncio.to_thread(read_file, temp_file.name)
await asyncio.to_thread(os.unlink, temp_file.name)
return audio_data
async def transcribe_chunk_async(session, chunk, chunk_index, api_url):
try:
logger.info(f"Preparing chunk {chunk_index+1} for API...")
audio_data = await export_audio_chunk(chunk, chunk_index)
form_data = aiohttp.FormData()
form_data.add_field('file', audio_data,
filename=f'chunk_{chunk_index}.mp3',
content_type='audio/mpeg')
form_data.add_field('model', MODEL_NAME)
form_data.add_field('response_format', 'json')
form_data.add_field('temperature', '0.0')
logger.info(
f"Sending chunk {chunk_index+1} ({len(audio_data)/1024:.1f} KB) to API...")
async with session.post(f"{api_url}/audio/transcriptions", data=form_data) as response:
if response.status != 200:
logger.error(
f"Error for chunk {chunk_index+1}: HTTP {response.status}")
text = await response.text()
logger.error(f"Response: {text[:500]}")
return chunk_index, None
try:
result = await response.json()
if 'text' in result:
logger.info(
f"Chunk {chunk_index+1} transcribed: {result['text'][:50]}...")
return chunk_index, result
except json.JSONDecodeError:
logger.error(
f"Failed to decode JSON response for chunk {chunk_index+1}")
text = await response.text()
logger.error(f"Response content: {text[:500]}...")
return chunk_index, None
except Exception as e:
logger.error(
f"Exception during transcription of chunk {chunk_index+1}: {e}")
return chunk_index, None
async def transcribe_audio_async(file_path):
audio = await asyncio.to_thread(load_audio_file, file_path)
chunks = await asyncio.to_thread(split_audio, audio)
connector = aiohttp.TCPConnector(limit=10)
timeout = aiohttp.ClientTimeout(total=600)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
tasks = [
transcribe_chunk_async(session, chunk, i, MODEL_ENDPOINT)
for i, chunk in enumerate(chunks)
]
results = await asyncio.gather(*tasks)
ordered_results = sorted(results, key=lambda x: x[0])
transcriptions = [r[1]['text'] if r[1]
and 'text' in r[1] else "" for r in ordered_results]
full_transcription = " ".join(transcriptions)
logger.info(
f"Transcription complete: {len(full_transcription)} characters")
return full_transcription
async def transcribe_audio_async_wrapper(file):
try:
logger.info(
f"Creating temporary file for uploaded file {file.filename}")
temp_file = tempfile.NamedTemporaryFile(
delete=False, suffix=os.path.splitext(file.filename)[1])
file.save(temp_file.name)
temp_file.close()
transcript = await transcribe_audio_async(temp_file.name)
await asyncio.to_thread(os.unlink, temp_file.name)
return transcript
except Exception as e:
logger.error(f"Transcription failed: {e}")
try:
if 'temp_file' in locals() and os.path.exists(temp_file.name):
await asyncio.to_thread(os.unlink, temp_file.name)
except:
pass
raise
def transcribe_audios(files):
async def transcribe_all():
return await asyncio.gather(*[transcribe_audio_async_wrapper(file) for file in files])
return asyncio.run(transcribe_all())
The overall flow is:
- Load an audio file (MP3 or MP4) with pydub
- Split it into overlapping chunks (~29 seconds each)
- Process each chunk in parallel:
- Export chunk to temporary MP3 file
- Read the file into memory
- Upload to transcription API
- Delete the temporary file
- Combine results into a single transcription
I'm using asyncio.to_thread()
and temporary files for processing, but despite using tempfile.NamedTemporaryFile
and explicitly unlinking files, I still get these ResourceWarnings. The warnings seem to come from the chunk.export()
method in pydub
(?).
I am struggling to see where in my code there is a file that is not being closed, unless it is internal to pydub
and how the AudioSegment
is being handled?