I am working on a real-time IoT application where sensor readings (including a timestamp, sensor ID, and measurement value) are stored in GridDB. To reduce the query load and improve response times for frequently accessed data, I want to maintain a local cache in my Python application. This cache should automatically refresh every few seconds with only the latest sensor data (for example, data from the last minute).
My challenges are:
- Continuously polling GridDB without blocking the main application thread.
- Updating the local cache with only the most recent sensor reading per sensor.
- Ensuring that the cache is refreshed in an “event-driven” manner (simulated here by a timed polling mechanism).
Below is a sample code snippet illustrating my approach:
import threading
import time
from datetime import datetime, timedelta
from griddb_python import StoreFactory
# Connect to GridDB
factory = StoreFactory.get_instance()
gridstore = factory.get_store(
host="192.168.0.10",
port=31999,
cluster_name="defaultCluster",
username="admin",
password="Farwa_Admin"
)
container = gridstore.get_container("sensor_data")
# Local cache to store latest reading per sensor_id
local_cache = {}
# Define cache duration and refresh interval
CACHE_DURATION = timedelta(minutes=1)
REFRESH_INTERVAL = 10 # seconds
def refresh_cache():
global local_cache
now = datetime.now()
start_time = now - CACHE_DURATION
# Use a prepared statement to query sensor data from the last minute
query = container.query("SELECT * WHERE timestamp >= ?")
query.set_parameters([start_time])
rs = query.fetch(False)
updated_cache = {}
# Update cache: keep only the most recent reading for each sensor
while rs.has_next():
row = rs.next() # assume row is a dict-like object with keys 'sensor_id', 'timestamp', 'value'
sensor_id = row['sensor_id']
# If sensor_id already exists, compare timestamps and update if this reading is more recent
if sensor_id in updated_cache:
if row['timestamp'] > updated_cache[sensor_id]['timestamp']:
updated_cache[sensor_id] = row
else:
updated_cache[sensor_id] = row
local_cache = updated_cache
print(f"Cache refreshed at {now.strftime('%Y-%m-%d %H:%M:%S')} with {len(local_cache)} entries.")
# Schedule the next cache refresh
threading.Timer(REFRESH_INTERVAL, refresh_cache).start()
# Start the event-driven cache refresh process
refresh_cache()
# Main application loop can continue doing other work without being blocked
while True:
# Example: Access the cache for quick sensor data retrieval
# (In a real application, replace this with actual logic)
time.sleep(REFRESH_INTERVAL)
print("Current cache snapshot:", local_cache)