I'm working with a 4-year dataset that I split into smaller time intervals (chunks) to send API requests to Thingsboard. I experimented with different chunk sizes—3 hours, 6 hours, 12 hours, and 24 hours. Since the overall time period remains the same (4 years), I expected the total amount of data retrieved to be consistent across different chunk sizes. However, I'm seeing inconsistent total data sizes depending on the chunk size used.
I've thoroughly tested my script and I'm confident that the implementation is correct. It appears that the discrepancy comes from the Thingsboard API, which returns varying data amounts for each chunk.
Script :
import json
import httpx
from urllib.parse import urlencode
import polars as pl
import asyncio
import os
import re
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from urllib.parse import urljoin
from itertools import chain
from config import BASE_URL, DATAFRAME_AFTER_ANALYSE
import time
numberofkeys = 0
OUTPUT_FILE = "/root/main-kafka-main/ALLDATASETS/DATAFROMMETAMORPHOSIS/new_2.jsonl"
RAW_RESPONSE_DIR = "raw_responses" # Directory to save raw API responses
error_file = []
def create_polling(deviceid, timestamp, slaveadd, template, modbusmap):
return my temple comes here I deleted it because it is long
# Function to fetch or load cached telemetry keys
async def get_cached_keys(client, header, entityID):
cache_dir = "keys_cache"
os.makedirs(cache_dir, exist_ok=True)
cache_file = os.path.join(cache_dir, f"{entityID}.json")
if os.path.exists(cache_file):
with open(cache_file, "r") as f:
keys = json.load(f)
else:
keys = await get_keys(client, header, entityID)
with open(cache_file, "w") as f:
json.dump(keys, f)
return keys
@retry(
retry=retry_if_exception_type(httpx.HTTPStatusError),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def get_keys(client, header, entityID):
all_keys_url = urljoin(BASE_URL, f"/api/plugins/telemetry/DEVICE/{entityID}/keys/timeseries")
response = await client.get(all_keys_url, headers=header)
response.raise_for_status()
data = response.json()
pattern = repile(r"^[0-9A-Fa-f]{4}(_H|_I)?$")
filtered_keys = [key for key in data if key and len(key) <= 6 and pattern.match(key)]
return filtered_keys
@retry(
retry=retry_if_exception_type(httpx.HTTPStatusError),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def get_telemetry_data(client, header, entityID, keys, useStrictDataTypes, startTs, endTs):
telemetry_value_path = f"/api/plugins/telemetry/DEVICE/{entityID}/values/timeseries"
query_params = {
"keys": ",".join(keys),
"useStrictDataTypes": str(useStrictDataTypes).lower(),
"startTs": startTs,
"endTs": endTs,
"limit": 53*100000000000,
"agg":"NONE",
"interval":0,
"intervalType":"MILLISECONDS"
}
url = f"{urljoin(BASE_URL, telemetry_value_path)}?{urlencode(query_params)}"
response = await client.get(url, headers=header)
response.raise_for_status()
return response.json()
@retry(
retry=retry_if_exception_type(httpx.HTTPStatusError),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10))
def generate_time_chunks(start_ts, end_ts, chunk_size_ms):
current_start = start_ts
while current_start < end_ts:
current_end = min(current_start + chunk_size_ms, end_ts)
yield current_start, current_end
current_start = current_end
async def get_device_data(semaphore, client, header, entityID, useStrictDataTypes, startTs, endTs, entityName, entityType, chunk_size_ms=1*24*3600*1000):
async with semaphore:
try:
filtered_keys = await get_cached_keys(client, header, entityID)
if not filtered_keys:
return
for chunk_start, chunk_end in generate_time_chunks(startTs, endTs, chunk_size_ms):
try:
data = await get_telemetry_data(client, header, entityID, filtered_keys, useStrictDataTypes, chunk_start, chunk_end)
if not data:
continue
timestamps = {entry["ts"] for entry in chain.from_iterable(data.values())}
timestamps = sorted(timestamps)
parts = entityName.rsplit("-", 1)
deviceid = parts[0]
try:
slaveadd = int(parts[1] if len(parts) > 1 else "")
except ValueError:
slaveadd = parts[1] if len(parts) > 1 else ""
data_by_ts = {}
for key, entries in data.items():
for entry in entries:
ts = entry["ts"]
data_by_ts.setdefault(ts, {})[key] = entry["value"]
pattern_key = repile(r"^([0-9A-Fa-f]{4})(?:_H|_I)$")
for ts in timestamps:
modbusmap = []
ts_data = data_by_ts.get(ts, {})
for key in filtered_keys:
try:
if key not in ts_data:
continue
if len(key) == 6:
new_key = key[:4].lower()
if key.endswith("H"):
modbusmap.append({"addr": f"0x{new_key}", "type": 0, "val": ts_data[key]})
else:
modbusmap.append({"addr": f"0x{new_key}", "type": 1, "val": ts_data[key]})
else:
new_key = key.lower()
modbusmap.append({"addr": f"0x{new_key}", "type": 0, "val": ts_data[key]})
modbusmap.append({"addr": f"0x{new_key}", "type": 1, "val": ts_data[key]})
except Exception as e:
print(f"Error processing key '{key}' for timestamp {ts} in chunk {chunk_start}-{chunk_end} for device {entityID}: {e}")
error_file.append(e)
continue
polling = create_polling(deviceid, ts, slaveadd, entityType, modbusmap)
yield polling
except Exception as e:
print(f"Error processing chunk {chunk_start}-{chunk_end} for device {entityID}: {e}")
continue
except Exception as r:
print(f"Error processing device {entityID}: {r}")
async def process_device(queue, semaphore, client, header, entityID, useStrictDataTypes, startTs, endTs, entityName, entityType):
async for polling in get_device_data(semaphore, client, header, entityID, useStrictDataTypes, startTs, endTs, entityName, entityType):
await queue.put(polling)
async def writer(queue, file_path):
with open(file_path, "w") as f:
while True:
item = await queue.get()
if item is None:
break
json_str = json.dumps(item)
f.write(json_str + "\n")
queue.task_done()
async def get_all_devices_from_file(file_name, token, useStrictDataTypes, startTs, endTs):
start_time = time.time()
file_path = f"{DATAFRAME_AFTER_ANALYSE}{file_name}.csv"
df_devices = pl.read_csv(file_path, columns=["id", "name", "type"])
header = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient(http2=True) as client:
semaphore = asyncio.Semaphore(50)
queue = asyncio.Queue()
writer_task = asyncio.create_task(writer(queue, OUTPUT_FILE))
tasks = []
for entityID, entityName, entityType in df_devices.select(["id", "name", "type"]).iter_rows():
task = asyncio.create_task(
process_device(queue, semaphore, client, header, entityID, useStrictDataTypes, startTs, endTs, entityName, entityType)
)
tasks.append(task)
await asyncio.gather(*tasks)
await queue.put(None)
await writer_task
end_time = time.time()
execution_time = end_time - start_time
return f"Execution time: {execution_time:.6f} seconds"
My questions are:
- Has anyone experienced similar behavior with Thingsboard or any other API when splitting data by time intervals?
- What could be causing these inconsistencies in the API response?
- Are there any best practices or known workarounds when dealing with such scenarios?