最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Cause of irresponsiblity of Celery task - Stack Overflow

programmeradmin3浏览0评论

I have a task for caching and receiving weather status of major cities in the world. I use RabbitMQ as a broker and use Redis for caching weather data, then I wrote a function to set weather data in Redis and schedule it with Celery. When I ran it alone it works but when I ran it with Celery scheduler didn't work.

It's my task.py

from celery import Celery
from weatherAPI import MajorCityWeather
# Create a Celery instance
app = Celery('tasks', broker='amqp://guest:guest@localhost//', broker_heartbeat=600,broker_connection_retry_on_startup=True)

# # Configure the Celery beat schedule
a=MajorCityWeather(base_url=";)
@app.task
def print_time():
    a.fetch_major_city_weather()
    print('ok') 
app.conf.beat_schedule = {
    'print-time-every-80-seconds': {
        'task': 'tasks.print_time',
        'schedule': 80,  # Run every 10 seconds
        
    },
}

app.conf.timezone = 'UTC'

it's my WeatherAPI.py

from requests import get, RequestException
from typing import Tuple, Dict, Optional
from producer import RPCClient
from datetime import datetime
from pybreaker import CircuitBreaker
from ast import literal_eval


class WeatherCodes:
    """
    A class that provides weather condition descriptions based on weather codes.

    The `codes` dictionary contains weather codes and their corresponding descriptions.
    """
    codes = {
        0: 'Clear sky', 1: 'Mainly clear', 2: 'Partly cloudy', 3: 'Overcast',
        45: 'Fog', 48: 'Depositing rime fog', 51: 'Light Drizzle', 53: 'Moderate Drizzle',
        55: 'Dense intensity Drizzle', 56: 'Freezing Drizzle Light', 57: 'Freezing Drizzle dense intensity',
        61: 'Rain Slight', 63: 'Moderate rain', 65: 'Heavy intensity rain', 66: 'Freezing rain light',
        67: 'Freezing rain heavy intensity', 71: 'Snow fall slight', 73: 'Moderate Snow fall',
        75: 'Heavy intensity snow fall', 77: 'Snow grains', 80: 'Slight rain showers',
        81: 'Moderate rain showers', 82: 'Violent rain showers', 85: 'Snow showers slight',
        86: 'Snow showers heavy', 95: 'Thunderstorm: Slight or moderate', 96: 'Thunderstorm with slight hail',
        99: 'Thunderstorm with heavy hail'
    }

    @classmethod
    def get_condition(cls, code: int) -> str:
        """
        Get the weather condition description based on the weather code.

        :param code: An integer representing the weather code.
        :return: A string description of the weather condition.
        """
        return cls.codes.get(code, 'Unknown condition')

    @classmethod
    def get_weather_code(cls, condition: str) -> int:
        """
        Get the weather code for a given condition.

        :param condition: A string representing the weather condition.
        :return: An integer representing the weather code for the given condition.
        """
        for code, desc in cls.codes.items():
            if desc == condition:
                return code
        return -1  # Return -1 or some other value to indicate "Unknown condition"


class WeatherService:
    """
    A service class to fetch weather information from an external API and handle caching.

    This class interacts with the weather API, stores the results in a cache (using Redis),
    and implements a circuit breaker to prevent repeated failures from affecting performance.
    """
    circuit_breaker = CircuitBreaker(fail_max=3, reset_timeout=60)

    def __init__(self, base_url: str):
        """
        Initialize the WeatherService with the base URL of the weather API.

        :param base_url: The base URL for the weather API.
        """
        self.base_url = base_url
        self.rpc_client = RPCClient()
        self.error_message = {"message": "Weather data unavailable"}

    @circuit_breaker
    def _make_request(self, latitude: str, longitude: str) -> Tuple[Optional[Dict], Optional[str]]:
        """
        Fetch the current weather from the weather API for the given latitude and longitude.

        :param latitude: The latitude of the location.
        :param longitude: The longitude of the location.
        :return: A tuple containing weather data or None if there is an error, and an error message (if applicable).
        """
        try:
            response = get(self.base_url, params={
                'latitude': latitude,
                'longitude': longitude,
                'current': 'temperature_2m,relative_humidity_2m,is_day,weather_code,wind_speed_10m',
                'timezone': 'UTC',
                'timeformat': 'unixtime'
            })

            data = response.json()
            weather_info = {
                'temperature': data['current']['temperature_2m'],
                'condition': data['current']['weather_code'],
                'humidity': data['current']['relative_humidity_2m'],
                'wind_speed': data['current']['wind_speed_10m'],
                'time': data['current']['time']
            }
            return weather_info, None

        except RequestException as e:
            return None, str(e)

    def is_data_fresh(self, result: Dict, current_unix_timestamp: int) -> bool:
        """
        Check if the cached data is still valid (within 1 hour).

        :param result: The cached weather data.
        :param current_unix_timestamp: The current time as a Unix timestamp.
        :return: True if the cached data is fresh, False otherwise.
        """
        difference = abs(current_unix_timestamp - int(result['time']))
        hours = difference // 3600
        return hours == 0

    def _handle_request_with_circuit_breaker(self, latitude: float, longitude: float):
        """
        Handle a weather request using the circuit breaker. If the request fails, it returns a fallback error message.

        :param latitude: The latitude of the location.
        :param longitude: The longitude of the location.
        :return: Weather data if successful, or an error message if the request fails.
        """
        weather_info, error = self._make_request(
            latitude=latitude, longitude=longitude)

        if weather_info:
            return weather_info
        else:
            return self.error_message

    def get_data(self, key: str) -> Dict:
        """
        Get cached weather data from the Redis store.

        :param key: The Redis key for the cached data.
        :return: The cached weather data.
        """
        result = self.rpc_client.call('hget', key)
        return result

    def set_data(self, key: str, values: str) -> None:
        """
        Set weather data in the Redis store.

        :param key: The Redis key to store the data under.
        :param values: The data to store in Redis.
        """
        result = self.rpc_client.call('hset', key, values)
        return result

    def Is_key_exists(self, key: str) -> bool:
        """
        Check if a cache key exists in Redis.

        :param key: The Redis key to check.
        :return: True if the key exists, False otherwise.
        """
        exists = self.rpc_client.call('kexists', key)
        if exists['exists']:
            return True
        else:
            return False

    def request_weather(self, latitude: float, longitude: float):
        """
        Fetch the weather either from the cache or by requesting from the API.

        :param latitude: The latitude of the location.
        :param longitude: The longitude of the location.
        :return: The weather data, either from the cache or freshly fetched.
        """
        if self.Is_key_exists(f'({latitude},{longitude})'):
            current_unix_timestamp = int(datetime.now().timestamp())
            result = self.get_data(f'({latitude},{longitude})')
            data = (result['data'])

            if not (str(self.error_message) == str(data)) and self.is_data_fresh(data, current_unix_timestamp):
                return data
            else:
                return self._handle_request_with_circuit_breaker(latitude, longitude)
        else:
            return self._handle_request_with_circuit_breaker(latitude, longitude)

    def is_valid_coordinates(self, latitude: float, longitude: float) -> bool:
        """
        Validate the latitude and longitude values.

        :param latitude: The latitude to check.
        :param longitude: The longitude to check.
        :return: True if the coordinates are valid, False otherwise.
        """
        return -90 <= latitude <= 90 and -180 <= longitude <= 180

    def lat_long_request(self, latitude: float, longitude: float):
        """
        Handle a request for weather data at a given latitude and longitude, either from cache or API.

        :param latitude: The latitude of the location.
        :param longitude: The longitude of the location.
        """
        if self.Is_key_exists(f'({latitude},{longitude})'):
            current_unix_timestamp = int(datetime.now().timestamp())
            result = self.get_data(f'({latitude},{longitude})')
            data = literal_eval(str(result['data']))
            if not (str(self.error_message) == str(data)) and self.is_data_fresh(data, current_unix_timestamp):
                return self.get_data(f'({latitude},{longitude})')
            else:
                weather_info = self.request_weather(
                    latitude=latitude, longitude=longitude)
                self.set_data(
                    key=f'({latitude},{longitude})', values=weather_info)
        else:
            weather_info = self.request_weather(
                latitude=latitude, longitude=longitude)
            if not (str(self.error_message) == str(weather_info)):
                self.set_data(
                    key=f'({latitude},{longitude})', values=weather_info)


class MajorCityWeather:
    """
    A class that manages weather data for major cities across various countries.

    This class retrieves and stores weather data for predefined major cities in a cache.
    """
    def __init__(self, base_url: str):
        """
        Initialize the cities and weather service.

        :param base_url: The base URL for the weather API.
        """
        self.lat_long = {
            'USA': {'New York': {'Latitude': 40.7128, 'Longitude': -74.0060}},
            'UK': {'London': {'Latitude': 51.5074, 'Longitude': -0.1278}},
            'Japan': {'Tokyo': {'Latitude': 35.6762, 'Longitude': 139.6503}},
            'France': {'Paris': {'Latitude': 48.8566, 'Longitude': 2.3522}},
            'Iran': {'Tehran': {'Latitude': 35.6892, 'Longitude': 51.3890}}
        }
        self.openmeteo = WeatherService(base_url=base_url)
        self.key = 'major_city_weather_info'

    def request_major_city_weather(self) -> dict:
        """
        Fetch and store weather data for major cities.

        :return: A dictionary with weather data for all major cities.
        """
        major_city_weather_info = {}
        for country, cities in self.lat_long.items():
            for city, coords in cities.items():
                latitude = coords['Latitude']
                longitude = coords['Longitude']
                weather_data = self.openmeteo.request_weather(
                    latitude=latitude, longitude=longitude)
                major_city_weather_info[country] = str({city: weather_data})
        return dict(major_city_weather_info)

    def get_city_weather(self):
        """
        Retrieve cached weather data for major cities.

        :return: The cached weather data for major cities.
        """
        return self.openmeteo.get_data(key=self.key)

    def set_city_weather(self):
        """
        Fetch and store fresh weather data for major cities.

        This updates the cached weather data for all major cities.
        """
        req = self.request_major_city_weather()
        if not str(req) == str({'USA': "{'New York': {'message': 'Weather data unavailable'}}", 'UK': "{'London': {'message': 'Weather data unavailable'}}", 'Japan': "{'Tokyo': {'message': 'Weather data unavailable'}}", 'France': "{'Paris': {'message': 'Weather data unavailable'}}", 'Iran': "{'Tehran': {'message': 'Weather data unavailable'}}"}):
            self.openmeteo.set_data(key=self.key, values=dict(req))

    def fetch_major_city_weather(self):
        """
        Fetch the weather data for major cities, checking if the data is fresh.

        If the data is outdated or unavailable, it will refresh the cache.
        """
        if self.openmeteo.Is_key_exists(self.key):
            current_unix_timestamp = int(datetime.now().timestamp())
            result = literal_eval(str(self.openmeteo.get_data(self.key)))
            data = dict(literal_eval(result['data']['Iran'])['Tehran'])
            if not str(data) == str(self.openmeteo.error_message) and result['status'] == 'success' and self.openmeteo.is_data_fresh(data, current_unix_timestamp):
                return result['data']
            else:
                self.set_city_weather()
        else:
            self.set_city_weather()

it's producer.py

import pika
import json
import uuid
from conf import rabbitmq_conf

class RPCClient:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_conf['host'],heartbeat=600))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=rabbitmq_conf['queue'][0],durable=True)  # Declare the RPC queue
        self.response = None
        self.corr_id = None

        # Declare a temporary queue for responses
        result = self.channel.queue_declare(queue=rabbitmq_conf['queue'][1], durable=True)        
        self.callback_queue = result.method.queue
        self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True)

    def on_response(self, ch, method, properties, body):
        if self.corr_id == properties.correlation_id:
            self.response = body

    def call(self, operation, key, values=None):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        message = {'operation': operation, 'key': key, 'values': values}
        self.channel.basic_publish(
            exchange=rabbitmq_conf['exchange'],
            routing_key=rabbitmq_conf['queue'][0],
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=json.dumps(message)
        )
        while self.response is None:
            self.connection.process_data_events()
        return json.loads(self.response)

When I ran my Celery worker and Celery beats, they are irresponsible and i got this error :

Traceback (most recent call last):
  File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/celery/app/trace.py", line 453, in trace_task
    R = retval = fun(*args, **kwargs)
                 ^^^^^^^^^^^^^^^^^^^^
  File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/celery/app/trace.py", line 736, in __protected_call__
    return self.run(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aryan/Documents/weather_task/weather_service/tasks/tasks.py", line 13, in print_time
    a.fetch_major_city_weather()
  File "/home/aryan/Documents/weather_task/weather_service/tasks/weatherAPI.py", line 289, in fetch_major_city_weather
    self.set_city_weather()
  File "/home/aryan/Documents/weather_task/weather_service/tasks/weatherAPI.py", line 270, in set_city_weather
    req = self.request_major_city_weather()
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aryan/Documents/weather_task/weather_service/tasks/weatherAPI.py", line 251, in request_major_city_weather
    weather_data = self.openmeteo.request_weather(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aryan/Documents/weather_task/weather_service/tasks/weatherAPI.py", line 170, in request_weather
    if self.Is_key_exists(f'({latitude},{longitude})'):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aryan/Documents/weather_task/weather_service/tasks/weatherAPI.py", line 156, in Is_key_exists
    exists = self.rpc_client.call('kexists', key)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aryan/Documents/weather_task/weather_service/tasks/producer.py", line 37, in call
    self.connection.process_data_events()
  File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/blocking_connection.py", line 845, in process_data_events
    self._flush_output(timer.is_ready, common_terminator)
  File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/blocking_connection.py", line 515, in _flush_output
    self._impl.ioloop.process_timeouts()
  File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/select_connection.py", line 497, in process_timeouts
    self._timer.process_timeouts()
  File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/select_connection.py", line 332, in process_timeouts
    timeout.callback()
  File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/heartbeat.py", line 109, in _send_heartbeat
    self._send_heartbeat_frame()
  File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/heartbeat.py", line 181, in _send_heartbeat_frame
    self._connection._send_frame(  # pylint: disable=W0212
  File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/connection.py", line 2241, in _send_frame
    self._output_marshaled_frames([marshaled_frame])
  File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/connection.py", line 2332, in _output_marshaled_frames
    self._adapter_emit_data(marshaled_frame)
  File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/base_connection.py", line 386, in _adapter_emit_data
    self._transport.write(data)
  File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/utils/io_services_utils.py", line 1030, in write
    self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable)
  File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/utils/selector_ioloop_adapter.py", line 332, in set_writer
    self._loop.update_handler(
  File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/select_connection.py", line 531, in update_handler
    self._poller.update_handler(fd, events)
  File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/select_connection.py", line 720, in update_handler
    self._modify_fd_events(
  File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/select_connection.py", line 1241, in _modify_fd_events
    self._poll.modify(fileno, events)
FileNotFoundError: [Errno 2] No such file or directory

What can I do about this issue?

发布评论

评论列表(0)

  1. 暂无评论