I have a task for caching and receiving weather status of major cities in the world. I use RabbitMQ as a broker and use Redis for caching weather data, then I wrote a function to set weather data in Redis and schedule it with Celery. When I ran it alone it works but when I ran it with Celery scheduler didn't work.
It's my task.py
from celery import Celery
from weatherAPI import MajorCityWeather
# Create a Celery instance
app = Celery('tasks', broker='amqp://guest:guest@localhost//', broker_heartbeat=600,broker_connection_retry_on_startup=True)
# # Configure the Celery beat schedule
a=MajorCityWeather(base_url=";)
@app.task
def print_time():
a.fetch_major_city_weather()
print('ok')
app.conf.beat_schedule = {
'print-time-every-80-seconds': {
'task': 'tasks.print_time',
'schedule': 80, # Run every 10 seconds
},
}
app.conf.timezone = 'UTC'
it's my WeatherAPI.py
from requests import get, RequestException
from typing import Tuple, Dict, Optional
from producer import RPCClient
from datetime import datetime
from pybreaker import CircuitBreaker
from ast import literal_eval
class WeatherCodes:
"""
A class that provides weather condition descriptions based on weather codes.
The `codes` dictionary contains weather codes and their corresponding descriptions.
"""
codes = {
0: 'Clear sky', 1: 'Mainly clear', 2: 'Partly cloudy', 3: 'Overcast',
45: 'Fog', 48: 'Depositing rime fog', 51: 'Light Drizzle', 53: 'Moderate Drizzle',
55: 'Dense intensity Drizzle', 56: 'Freezing Drizzle Light', 57: 'Freezing Drizzle dense intensity',
61: 'Rain Slight', 63: 'Moderate rain', 65: 'Heavy intensity rain', 66: 'Freezing rain light',
67: 'Freezing rain heavy intensity', 71: 'Snow fall slight', 73: 'Moderate Snow fall',
75: 'Heavy intensity snow fall', 77: 'Snow grains', 80: 'Slight rain showers',
81: 'Moderate rain showers', 82: 'Violent rain showers', 85: 'Snow showers slight',
86: 'Snow showers heavy', 95: 'Thunderstorm: Slight or moderate', 96: 'Thunderstorm with slight hail',
99: 'Thunderstorm with heavy hail'
}
@classmethod
def get_condition(cls, code: int) -> str:
"""
Get the weather condition description based on the weather code.
:param code: An integer representing the weather code.
:return: A string description of the weather condition.
"""
return cls.codes.get(code, 'Unknown condition')
@classmethod
def get_weather_code(cls, condition: str) -> int:
"""
Get the weather code for a given condition.
:param condition: A string representing the weather condition.
:return: An integer representing the weather code for the given condition.
"""
for code, desc in cls.codes.items():
if desc == condition:
return code
return -1 # Return -1 or some other value to indicate "Unknown condition"
class WeatherService:
"""
A service class to fetch weather information from an external API and handle caching.
This class interacts with the weather API, stores the results in a cache (using Redis),
and implements a circuit breaker to prevent repeated failures from affecting performance.
"""
circuit_breaker = CircuitBreaker(fail_max=3, reset_timeout=60)
def __init__(self, base_url: str):
"""
Initialize the WeatherService with the base URL of the weather API.
:param base_url: The base URL for the weather API.
"""
self.base_url = base_url
self.rpc_client = RPCClient()
self.error_message = {"message": "Weather data unavailable"}
@circuit_breaker
def _make_request(self, latitude: str, longitude: str) -> Tuple[Optional[Dict], Optional[str]]:
"""
Fetch the current weather from the weather API for the given latitude and longitude.
:param latitude: The latitude of the location.
:param longitude: The longitude of the location.
:return: A tuple containing weather data or None if there is an error, and an error message (if applicable).
"""
try:
response = get(self.base_url, params={
'latitude': latitude,
'longitude': longitude,
'current': 'temperature_2m,relative_humidity_2m,is_day,weather_code,wind_speed_10m',
'timezone': 'UTC',
'timeformat': 'unixtime'
})
data = response.json()
weather_info = {
'temperature': data['current']['temperature_2m'],
'condition': data['current']['weather_code'],
'humidity': data['current']['relative_humidity_2m'],
'wind_speed': data['current']['wind_speed_10m'],
'time': data['current']['time']
}
return weather_info, None
except RequestException as e:
return None, str(e)
def is_data_fresh(self, result: Dict, current_unix_timestamp: int) -> bool:
"""
Check if the cached data is still valid (within 1 hour).
:param result: The cached weather data.
:param current_unix_timestamp: The current time as a Unix timestamp.
:return: True if the cached data is fresh, False otherwise.
"""
difference = abs(current_unix_timestamp - int(result['time']))
hours = difference // 3600
return hours == 0
def _handle_request_with_circuit_breaker(self, latitude: float, longitude: float):
"""
Handle a weather request using the circuit breaker. If the request fails, it returns a fallback error message.
:param latitude: The latitude of the location.
:param longitude: The longitude of the location.
:return: Weather data if successful, or an error message if the request fails.
"""
weather_info, error = self._make_request(
latitude=latitude, longitude=longitude)
if weather_info:
return weather_info
else:
return self.error_message
def get_data(self, key: str) -> Dict:
"""
Get cached weather data from the Redis store.
:param key: The Redis key for the cached data.
:return: The cached weather data.
"""
result = self.rpc_client.call('hget', key)
return result
def set_data(self, key: str, values: str) -> None:
"""
Set weather data in the Redis store.
:param key: The Redis key to store the data under.
:param values: The data to store in Redis.
"""
result = self.rpc_client.call('hset', key, values)
return result
def Is_key_exists(self, key: str) -> bool:
"""
Check if a cache key exists in Redis.
:param key: The Redis key to check.
:return: True if the key exists, False otherwise.
"""
exists = self.rpc_client.call('kexists', key)
if exists['exists']:
return True
else:
return False
def request_weather(self, latitude: float, longitude: float):
"""
Fetch the weather either from the cache or by requesting from the API.
:param latitude: The latitude of the location.
:param longitude: The longitude of the location.
:return: The weather data, either from the cache or freshly fetched.
"""
if self.Is_key_exists(f'({latitude},{longitude})'):
current_unix_timestamp = int(datetime.now().timestamp())
result = self.get_data(f'({latitude},{longitude})')
data = (result['data'])
if not (str(self.error_message) == str(data)) and self.is_data_fresh(data, current_unix_timestamp):
return data
else:
return self._handle_request_with_circuit_breaker(latitude, longitude)
else:
return self._handle_request_with_circuit_breaker(latitude, longitude)
def is_valid_coordinates(self, latitude: float, longitude: float) -> bool:
"""
Validate the latitude and longitude values.
:param latitude: The latitude to check.
:param longitude: The longitude to check.
:return: True if the coordinates are valid, False otherwise.
"""
return -90 <= latitude <= 90 and -180 <= longitude <= 180
def lat_long_request(self, latitude: float, longitude: float):
"""
Handle a request for weather data at a given latitude and longitude, either from cache or API.
:param latitude: The latitude of the location.
:param longitude: The longitude of the location.
"""
if self.Is_key_exists(f'({latitude},{longitude})'):
current_unix_timestamp = int(datetime.now().timestamp())
result = self.get_data(f'({latitude},{longitude})')
data = literal_eval(str(result['data']))
if not (str(self.error_message) == str(data)) and self.is_data_fresh(data, current_unix_timestamp):
return self.get_data(f'({latitude},{longitude})')
else:
weather_info = self.request_weather(
latitude=latitude, longitude=longitude)
self.set_data(
key=f'({latitude},{longitude})', values=weather_info)
else:
weather_info = self.request_weather(
latitude=latitude, longitude=longitude)
if not (str(self.error_message) == str(weather_info)):
self.set_data(
key=f'({latitude},{longitude})', values=weather_info)
class MajorCityWeather:
"""
A class that manages weather data for major cities across various countries.
This class retrieves and stores weather data for predefined major cities in a cache.
"""
def __init__(self, base_url: str):
"""
Initialize the cities and weather service.
:param base_url: The base URL for the weather API.
"""
self.lat_long = {
'USA': {'New York': {'Latitude': 40.7128, 'Longitude': -74.0060}},
'UK': {'London': {'Latitude': 51.5074, 'Longitude': -0.1278}},
'Japan': {'Tokyo': {'Latitude': 35.6762, 'Longitude': 139.6503}},
'France': {'Paris': {'Latitude': 48.8566, 'Longitude': 2.3522}},
'Iran': {'Tehran': {'Latitude': 35.6892, 'Longitude': 51.3890}}
}
self.openmeteo = WeatherService(base_url=base_url)
self.key = 'major_city_weather_info'
def request_major_city_weather(self) -> dict:
"""
Fetch and store weather data for major cities.
:return: A dictionary with weather data for all major cities.
"""
major_city_weather_info = {}
for country, cities in self.lat_long.items():
for city, coords in cities.items():
latitude = coords['Latitude']
longitude = coords['Longitude']
weather_data = self.openmeteo.request_weather(
latitude=latitude, longitude=longitude)
major_city_weather_info[country] = str({city: weather_data})
return dict(major_city_weather_info)
def get_city_weather(self):
"""
Retrieve cached weather data for major cities.
:return: The cached weather data for major cities.
"""
return self.openmeteo.get_data(key=self.key)
def set_city_weather(self):
"""
Fetch and store fresh weather data for major cities.
This updates the cached weather data for all major cities.
"""
req = self.request_major_city_weather()
if not str(req) == str({'USA': "{'New York': {'message': 'Weather data unavailable'}}", 'UK': "{'London': {'message': 'Weather data unavailable'}}", 'Japan': "{'Tokyo': {'message': 'Weather data unavailable'}}", 'France': "{'Paris': {'message': 'Weather data unavailable'}}", 'Iran': "{'Tehran': {'message': 'Weather data unavailable'}}"}):
self.openmeteo.set_data(key=self.key, values=dict(req))
def fetch_major_city_weather(self):
"""
Fetch the weather data for major cities, checking if the data is fresh.
If the data is outdated or unavailable, it will refresh the cache.
"""
if self.openmeteo.Is_key_exists(self.key):
current_unix_timestamp = int(datetime.now().timestamp())
result = literal_eval(str(self.openmeteo.get_data(self.key)))
data = dict(literal_eval(result['data']['Iran'])['Tehran'])
if not str(data) == str(self.openmeteo.error_message) and result['status'] == 'success' and self.openmeteo.is_data_fresh(data, current_unix_timestamp):
return result['data']
else:
self.set_city_weather()
else:
self.set_city_weather()
it's producer.py
import pika
import json
import uuid
from conf import rabbitmq_conf
class RPCClient:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_conf['host'],heartbeat=600))
self.channel = self.connection.channel()
self.channel.queue_declare(queue=rabbitmq_conf['queue'][0],durable=True) # Declare the RPC queue
self.response = None
self.corr_id = None
# Declare a temporary queue for responses
result = self.channel.queue_declare(queue=rabbitmq_conf['queue'][1], durable=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True)
def on_response(self, ch, method, properties, body):
if self.corr_id == properties.correlation_id:
self.response = body
def call(self, operation, key, values=None):
self.response = None
self.corr_id = str(uuid.uuid4())
message = {'operation': operation, 'key': key, 'values': values}
self.channel.basic_publish(
exchange=rabbitmq_conf['exchange'],
routing_key=rabbitmq_conf['queue'][0],
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=json.dumps(message)
)
while self.response is None:
self.connection.process_data_events()
return json.loads(self.response)
When I ran my Celery worker and Celery beats, they are irresponsible and i got this error :
Traceback (most recent call last):
File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/celery/app/trace.py", line 453, in trace_task
R = retval = fun(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^
File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/celery/app/trace.py", line 736, in __protected_call__
return self.run(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aryan/Documents/weather_task/weather_service/tasks/tasks.py", line 13, in print_time
a.fetch_major_city_weather()
File "/home/aryan/Documents/weather_task/weather_service/tasks/weatherAPI.py", line 289, in fetch_major_city_weather
self.set_city_weather()
File "/home/aryan/Documents/weather_task/weather_service/tasks/weatherAPI.py", line 270, in set_city_weather
req = self.request_major_city_weather()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aryan/Documents/weather_task/weather_service/tasks/weatherAPI.py", line 251, in request_major_city_weather
weather_data = self.openmeteo.request_weather(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aryan/Documents/weather_task/weather_service/tasks/weatherAPI.py", line 170, in request_weather
if self.Is_key_exists(f'({latitude},{longitude})'):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aryan/Documents/weather_task/weather_service/tasks/weatherAPI.py", line 156, in Is_key_exists
exists = self.rpc_client.call('kexists', key)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aryan/Documents/weather_task/weather_service/tasks/producer.py", line 37, in call
self.connection.process_data_events()
File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/blocking_connection.py", line 845, in process_data_events
self._flush_output(timer.is_ready, common_terminator)
File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/blocking_connection.py", line 515, in _flush_output
self._impl.ioloop.process_timeouts()
File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/select_connection.py", line 497, in process_timeouts
self._timer.process_timeouts()
File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/select_connection.py", line 332, in process_timeouts
timeout.callback()
File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/heartbeat.py", line 109, in _send_heartbeat
self._send_heartbeat_frame()
File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/heartbeat.py", line 181, in _send_heartbeat_frame
self._connection._send_frame( # pylint: disable=W0212
File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/connection.py", line 2241, in _send_frame
self._output_marshaled_frames([marshaled_frame])
File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/connection.py", line 2332, in _output_marshaled_frames
self._adapter_emit_data(marshaled_frame)
File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/base_connection.py", line 386, in _adapter_emit_data
self._transport.write(data)
File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/utils/io_services_utils.py", line 1030, in write
self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable)
File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/utils/selector_ioloop_adapter.py", line 332, in set_writer
self._loop.update_handler(
File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/select_connection.py", line 531, in update_handler
self._poller.update_handler(fd, events)
File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/select_connection.py", line 720, in update_handler
self._modify_fd_events(
File "/home/aryan/Documents/weather_task/.venv/lib/python3.12/site-packages/pika/adapters/select_connection.py", line 1241, in _modify_fd_events
self._poll.modify(fileno, events)
FileNotFoundError: [Errno 2] No such file or directory
What can I do about this issue?