最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Unclosed file warning using asyncio Python - Stack Overflow

programmeradmin0浏览0评论

I'm building a Python service to process audio files (MP3/MP4) by splitting them into chunks and transcribing them asynchronously using a REST API. While the code works, I'm getting numerous ResourceWarnings about unclosed files:

CopyResourceWarning: unclosed file <_io.BufferedRandom name='/var/folders/vw/k_0mpg690rg44m0mw3vj9l2c0000gp/T/tmpoico59vl.mp3'>
handle = None
ResourceWarning: Enable tracemalloc to get the object allocation traceback

My code is processing multiple audio chunks in parallel, and despite explicitly closing and unlinking temporary files, I still see these warnings appearing in my logs. Here's the relevant code:

import logging
import aiohttp
import asyncio
import json
import os
import tempfile
import sys
import subprocess

from pydub import AudioSegment

logger = logging.getLogger(__name__)

MODEL_ENDPOINT = (
    "http://example"
)
MODEL_NAME = "example"


def load_audio_file(file_path):
    """Load audio file from disk."""
    logger.info(f"Loading audio file from {file_path}")

    file_extension = os.path.splitext(file_path)[1][1:].lower()
    if file_extension == 'mp3':
        audio = AudioSegment.from_mp3(file_path)
    elif file_extension == 'mp4':
        temp_mp3 = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3')
        temp_mp3.close()

        try:
            cmd = [
                'ffmpeg', '-i', file_path,
                '-vn',
                '-ar', '44100',
                '-ac', '2',
                '-b:a', '192k',
                '-f', 'mp3',
                '-y',
                temp_mp3.name
            ]
            logger.info(f"MP4 file detected, converting to mp3")
            subprocess.run(
                cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

            audio = AudioSegment.from_mp3(temp_mp3.name)
        finally:
            if os.path.exists(temp_mp3.name):
                os.unlink(temp_mp3.name)
    else:
        raise Exception("Only mp3 and mp4 files supported")

    logger.info(f"Loaded audio file: {len(audio)}ms duration")
    return audio


def split_audio(audio, chunk_duration_ms=29000):
    chunks = []
    total_duration_ms = len(audio)

    overlap_ms = 500

    for start_ms in range(0, total_duration_ms, chunk_duration_ms - overlap_ms):
        end_ms = min(start_ms + chunk_duration_ms, total_duration_ms)
        chunk = audio[start_ms:end_ms]
        chunks.append(chunk)

    logger.info(f"Split audio into {len(chunks)} chunks")
    return chunks


async def export_audio_chunk(chunk, chunk_index):
    temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3')
    temp_file.close()
    
    # run the export in a thread to avoid blocking the event loop
    await asyncio.to_thread(chunk.export, temp_file.name, format="mp3")
    
    def read_file(filename):
        with open(filename, 'rb') as f:
            return f.read()
        
    audio_data = await asyncio.to_thread(read_file, temp_file.name)
    await asyncio.to_thread(os.unlink, temp_file.name)
    
    return audio_data


async def transcribe_chunk_async(session, chunk, chunk_index, api_url):
    try:
        logger.info(f"Preparing chunk {chunk_index+1} for API...")
        audio_data = await export_audio_chunk(chunk, chunk_index)
        
        form_data = aiohttp.FormData()
        form_data.add_field('file', audio_data,
                            filename=f'chunk_{chunk_index}.mp3',
                            content_type='audio/mpeg')
        form_data.add_field('model', MODEL_NAME)
        form_data.add_field('response_format', 'json')
        form_data.add_field('temperature', '0.0')

        logger.info(
            f"Sending chunk {chunk_index+1} ({len(audio_data)/1024:.1f} KB) to API...")

        async with session.post(f"{api_url}/audio/transcriptions", data=form_data) as response:
            if response.status != 200:
                logger.error(
                    f"Error for chunk {chunk_index+1}: HTTP {response.status}")
                text = await response.text()
                logger.error(f"Response: {text[:500]}")
                return chunk_index, None

            try:
                result = await response.json()
                if 'text' in result:
                    logger.info(
                        f"Chunk {chunk_index+1} transcribed: {result['text'][:50]}...")
                return chunk_index, result
            except json.JSONDecodeError:
                logger.error(
                    f"Failed to decode JSON response for chunk {chunk_index+1}")
                text = await response.text()
                logger.error(f"Response content: {text[:500]}...")
                return chunk_index, None
    except Exception as e:
        logger.error(
            f"Exception during transcription of chunk {chunk_index+1}: {e}")
        return chunk_index, None


async def transcribe_audio_async(file_path):
    audio = await asyncio.to_thread(load_audio_file, file_path)
    chunks = await asyncio.to_thread(split_audio, audio)

    connector = aiohttp.TCPConnector(limit=10)
    timeout = aiohttp.ClientTimeout(total=600)

    async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
        tasks = [
            transcribe_chunk_async(session, chunk, i, MODEL_ENDPOINT)
            for i, chunk in enumerate(chunks)
        ]

        results = await asyncio.gather(*tasks)

    ordered_results = sorted(results, key=lambda x: x[0])
    transcriptions = [r[1]['text'] if r[1]
                      and 'text' in r[1] else "" for r in ordered_results]

    full_transcription = " ".join(transcriptions)

    logger.info(
        f"Transcription complete: {len(full_transcription)} characters")
    return full_transcription


async def transcribe_audio_async_wrapper(file):
    try:
        logger.info(
            f"Creating temporary file for uploaded file {file.filename}")
        temp_file = tempfile.NamedTemporaryFile(
            delete=False, suffix=os.path.splitext(file.filename)[1])
        file.save(temp_file.name)
        temp_file.close()

        transcript = await transcribe_audio_async(temp_file.name)
        await asyncio.to_thread(os.unlink, temp_file.name)

        return transcript
    except Exception as e:
        logger.error(f"Transcription failed: {e}")
        try:
            if 'temp_file' in locals() and os.path.exists(temp_file.name):
                await asyncio.to_thread(os.unlink, temp_file.name)
        except:
            pass
        raise


def transcribe_audios(files):
    async def transcribe_all():
        return await asyncio.gather(*[transcribe_audio_async_wrapper(file) for file in files])

    return asyncio.run(transcribe_all())

The overall flow is:

  1. Load an audio file (MP3 or MP4) with pydub
  2. Split it into overlapping chunks (~29 seconds each)
  3. Process each chunk in parallel:
  • Export chunk to temporary MP3 file
  • Read the file into memory
  • Upload to transcription API
  • Delete the temporary file
  1. Combine results into a single transcription

I'm using asyncio.to_thread() and temporary files for processing, but despite using tempfile.NamedTemporaryFile and explicitly unlinking files, I still get these ResourceWarnings. The warnings seem to come from the chunk.export() method in pydub (?).

I am struggling to see where in my code there is a file that is not being closed, unless it is internal to pydub and how the AudioSegment is being handled?

发布评论

评论列表(0)

  1. 暂无评论