最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

How can i upload several files with telegram-bot in python to process them in a queue? - Stack Overflow

programmeradmin5浏览0评论

This code is a bot for analyzing files, which accepts them and offers two options for analysis: determining the length or its genre. When launched, the bot responds to the /start command with a greeting message, after which it waits for the file to load, which is saved in the temporary folder temp_file. After receiving the file, the bot displays a keyboard with buttons for selecting the type of analysis: "genre" to determine the genre (so far implemented as a random choice between "rock" and "hip-hop") and "length" to calculate the length of the track. After processing, the file is deleted, and the user data is cleared. The main functionality is implemented in Python

import random
import os
import librosa
from telegram import ReplyKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, filters


TOKEN = "TOKEN"

GENRES = ["rock", "hip-hop"]
TEMP_DIR = "temp_file"  

os.makedirs(TEMP_DIR, exist_ok=True)

CLASSIFY_KEYBOARD = ReplyKeyboardMarkup([["genre", "lenght"]], resize_keyboard=True, one_time_keyboard=True)

async def start(update):
    await update.message.reply_text("Hi! Send me file and I will process it")

async def handle_file(update, context):
    file = await get_user_file(update)
    if not file:
        return

    context.user_data["file_path"] = file
    await update.message.reply_text("Choose analysys type:", reply_markup=CLASSIFY_KEYBOARD)

async def get_user_file(update):
    if update.message.document:
        if not update.message.document.file_name.lower().endswith('.mp3'):
            await update.message.reply_text("Send .mp3 file!")
            return None
        file = await update.message.document.get_file()
    else:
        await update.message.reply_text("Send file first")
        return None

    file_path = os.path.join(TEMP_DIR, file.file_id + ".mp3")
    await file.download_to_drive(file_path)
    return file_path

async def handle_classify_choice(update, context):
    if "file_path" not in context.user_data:
        await update.message.reply_text("Send file first")
        return

    choice = update.message.text
    file_path = context.user_data["file_path"]

    if choice == "lenght":
        lenght = await get_lenght(file_path)
        await update.message.reply_text(f"Lenght is: {lenght}")
    elif choice == "genre":
        result = random.choice(GENRES)
        await update.message.reply_text(f"It is: {result}")

    if os.path.exists(file_path):
        os.remove(file_path)
    context.user_data.clear()

async def get_lenght(file_path):
    y, sr = librosa.load(file_path, sr=None)
    return len(y) / sr

def main():
    application = Application.builder().token(TOKEN).build()

    application.add_handler(CommandHandler("start", start))
    application.add_handler(MessageHandler(filters.Document.FileExtension("mp3"), handle_file))
    application.add_handler(MessageHandler(filters.Regex("^(genre|lenght)$"), handle_classify_choice))
    application.run_polling()

if __name__ == '__main__':
    main()
发布评论

评论列表(0)

  1. 暂无评论