最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Inconsistent API Data Size When Splitting a 4-Year Dataset into Various Time Chunks (Thingsbaord) - Stack Overflow

programmeradmin2浏览0评论

I'm working with a 4-year dataset that I split into smaller time intervals (chunks) to send API requests to Thingsboard. I experimented with different chunk sizes—3 hours, 6 hours, 12 hours, and 24 hours. Since the overall time period remains the same (4 years), I expected the total amount of data retrieved to be consistent across different chunk sizes. However, I'm seeing inconsistent total data sizes depending on the chunk size used.

I've thoroughly tested my script and I'm confident that the implementation is correct. It appears that the discrepancy comes from the Thingsboard API, which returns varying data amounts for each chunk.

Script :

import json
import httpx
from urllib.parse import urlencode
import polars as pl
import asyncio
import os
import re
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from urllib.parse import urljoin
from itertools import chain
from config import BASE_URL, DATAFRAME_AFTER_ANALYSE
import time

numberofkeys = 0
OUTPUT_FILE = "/root/main-kafka-main/ALLDATASETS/DATAFROMMETAMORPHOSIS/new_2.jsonl"
RAW_RESPONSE_DIR = "raw_responses"  # Directory to save raw API responses
error_file = []


def create_polling(deviceid, timestamp, slaveadd, template, modbusmap):

    return my temple comes here I deleted it because it is long 


# Function to fetch or load cached telemetry keys



async def get_cached_keys(client, header, entityID):
    cache_dir = "keys_cache"
    os.makedirs(cache_dir, exist_ok=True)
    cache_file = os.path.join(cache_dir, f"{entityID}.json")

    if os.path.exists(cache_file):
        with open(cache_file, "r") as f:
            keys = json.load(f)
    else:
        keys = await get_keys(client, header, entityID)
        with open(cache_file, "w") as f:
            json.dump(keys, f)
    return keys

@retry(
    retry=retry_if_exception_type(httpx.HTTPStatusError),
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def get_keys(client, header, entityID):
    all_keys_url = urljoin(BASE_URL, f"/api/plugins/telemetry/DEVICE/{entityID}/keys/timeseries")
    response = await client.get(all_keys_url, headers=header)
    response.raise_for_status()
    data = response.json()
    pattern = repile(r"^[0-9A-Fa-f]{4}(_H|_I)?$")
    filtered_keys = [key for key in data if key and len(key) <= 6 and pattern.match(key)]
  
    return filtered_keys

@retry(
    retry=retry_if_exception_type(httpx.HTTPStatusError),
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def get_telemetry_data(client, header, entityID, keys, useStrictDataTypes, startTs, endTs):

    telemetry_value_path = f"/api/plugins/telemetry/DEVICE/{entityID}/values/timeseries"
    query_params = {
        "keys": ",".join(keys),
        "useStrictDataTypes": str(useStrictDataTypes).lower(),
        "startTs": startTs,
        "endTs": endTs,
        "limit": 53*100000000000,
        "agg":"NONE", 
        "interval":0,
        "intervalType":"MILLISECONDS"
    }
    url = f"{urljoin(BASE_URL, telemetry_value_path)}?{urlencode(query_params)}"
    response = await client.get(url, headers=header)
    response.raise_for_status()
    return response.json()
@retry(
    retry=retry_if_exception_type(httpx.HTTPStatusError),
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10))


def generate_time_chunks(start_ts, end_ts, chunk_size_ms):

    current_start = start_ts
    while current_start < end_ts:
        current_end = min(current_start + chunk_size_ms, end_ts)
        yield current_start, current_end
        current_start = current_end



async def get_device_data(semaphore, client, header, entityID, useStrictDataTypes, startTs, endTs, entityName, entityType, chunk_size_ms=1*24*3600*1000):
    async with semaphore:
        try:
            filtered_keys = await get_cached_keys(client, header, entityID)
            if not filtered_keys:
                return
            for chunk_start, chunk_end in generate_time_chunks(startTs, endTs, chunk_size_ms):
                try:
                    data = await get_telemetry_data(client, header, entityID, filtered_keys, useStrictDataTypes, chunk_start, chunk_end)
                    if not data:
                        continue
                    timestamps = {entry["ts"] for entry in chain.from_iterable(data.values())}
                    timestamps = sorted(timestamps)
                    parts = entityName.rsplit("-", 1)
                    deviceid = parts[0]
                    try:
                        slaveadd = int(parts[1] if len(parts) > 1 else "")
                    except ValueError:
                        slaveadd = parts[1] if len(parts) > 1 else ""
                    data_by_ts = {}
                    for key, entries in data.items():
                        for entry in entries:
                            ts = entry["ts"]
                            data_by_ts.setdefault(ts, {})[key] = entry["value"]
                    pattern_key = repile(r"^([0-9A-Fa-f]{4})(?:_H|_I)$")
                    for ts in timestamps:
                        modbusmap = []
                        ts_data = data_by_ts.get(ts, {})
                        for key in filtered_keys:
                            try:
                                if key not in ts_data:
                                    continue
                                if len(key) == 6:
                                    new_key = key[:4].lower()
                                    if key.endswith("H"):
                                        modbusmap.append({"addr": f"0x{new_key}", "type": 0, "val": ts_data[key]})
                                    else:
                                        modbusmap.append({"addr": f"0x{new_key}", "type": 1, "val": ts_data[key]})
                                else:
                                    new_key = key.lower()
                                    modbusmap.append({"addr": f"0x{new_key}", "type": 0, "val": ts_data[key]})
                                    modbusmap.append({"addr": f"0x{new_key}", "type": 1, "val": ts_data[key]})
                            except Exception as e:
                                print(f"Error processing key '{key}' for timestamp {ts} in chunk {chunk_start}-{chunk_end} for device {entityID}: {e}")
                                error_file.append(e)
                                continue 
                        polling = create_polling(deviceid, ts, slaveadd, entityType, modbusmap)
                        yield polling
                except Exception as e:
                    print(f"Error processing chunk {chunk_start}-{chunk_end} for device {entityID}: {e}")
                    
                    continue
        except Exception as r:
            print(f"Error processing device {entityID}: {r}")
            
        
async def process_device(queue, semaphore, client, header, entityID, useStrictDataTypes, startTs, endTs, entityName, entityType):

    async for polling in get_device_data(semaphore, client, header, entityID, useStrictDataTypes, startTs, endTs, entityName, entityType):
        await queue.put(polling)

async def writer(queue, file_path):

    with open(file_path, "w") as f:
        while True:
            item = await queue.get()
            if item is None:
                break
            json_str = json.dumps(item)
            f.write(json_str + "\n")
            queue.task_done()

async def get_all_devices_from_file(file_name, token, useStrictDataTypes, startTs, endTs):

    start_time = time.time()
    file_path = f"{DATAFRAME_AFTER_ANALYSE}{file_name}.csv"
    df_devices = pl.read_csv(file_path, columns=["id", "name", "type"])
    header = {"Authorization": f"Bearer {token}"}

    async with httpx.AsyncClient(http2=True) as client:
        semaphore = asyncio.Semaphore(50)  
        queue = asyncio.Queue()
        writer_task = asyncio.create_task(writer(queue, OUTPUT_FILE))

        tasks = []
        for entityID, entityName, entityType in df_devices.select(["id", "name", "type"]).iter_rows():
            task = asyncio.create_task(
                process_device(queue, semaphore, client, header, entityID, useStrictDataTypes, startTs, endTs, entityName, entityType)
            )
            tasks.append(task)
        
        await asyncio.gather(*tasks)
        await queue.put(None) 
        await writer_task
    
    end_time = time.time()
    execution_time = end_time - start_time
    return f"Execution time: {execution_time:.6f} seconds"

My questions are:

  • Has anyone experienced similar behavior with Thingsboard or any other API when splitting data by time intervals?
  • What could be causing these inconsistencies in the API response?
  • Are there any best practices or known workarounds when dealing with such scenarios?

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论