最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Average aggregation of data stream in bytewax - Stack Overflow

programmeradmin3浏览0评论

I want to aggregate the values of my DataStream in tumbling windows of 10 seconds.

Unfortunately is the documentation in Bytewax very limited and I also don't find any other source where an average of the data is performed.

I have the following script and my reduce_window function is adding up the values, but all my attempts to divide to get the average failed..

import json
import time
from datetime import datetime, timedelta, timezone

from bytewax.connectors.kafka import KafkaSource
from bytewax import operators as op
from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutSink

from bytewax.operators.windowing import (
    EventClock,
    TumblingWindower,
    reduce_window,
)


time.sleep(10)


# Define the Bytewax dataflow
flow = Dataflow("Exmaple-Flow")

# Kafka Source (consume messages from topic)
kafka_source = KafkaSource(
    brokers=["kafka:9093"],
    topics=["factory_001"],
)

def extract_value(msg):
    """Extract JSON data from KafkaSourceMessage."""
    try:
        # Decode byte string to a normal string
        message_str = msg.value.decode("utf-8")

        # Convert JSON string to Python dictionary
        message_dict = json.loads(json.loads(message_str))

        return message_dict

    except Exception as e:
        print(f"Error parsing Kafka message: {e}")
        return None  # Return None if there's an error

def extract_timestamp(msg):
    """Extract and convert Kafka timestamp"""
    return datetime.strptime(msg["timestamp"], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)

kinp = op.input("kafka-in", flow, kafka_source)

mapped = op.map("extract_string", kinp, lambda x: extract_value(x))

keyed_stream = op.key_on("key_on_engine_id", mapped, lambda e: e["engine_id"])

clock = EventClock(
    ts_getter=extract_timestamp,
    wait_for_system_duration=timedelta(seconds=10)
)
windower = TumblingWindower(
    length=timedelta(seconds=10),
    align_to=datetime(2024, 8, 29, 8, 0, 0, tzinfo=timezone.utc)
)

def add(acc, x):
    acc["temp_air"] += x["temp_air"]
    return acc

windowed_avg = reduce_window(
    step_id="average_temp_air",
    up=keyed_stream,
    clock=clock,
    windower=windower,
    reducer=add
)

op.output("out", windowed_avg.down, StdOutSink())

Stream example returned by the output:

('engine_001', (1756558, {'factory_id': 'factory_001', 'engine_id': 'engine_001', 'timestamp': '2025-03-20 15:19:45', 'temp_air': 499.29999999999995, 'temp_oil': 89.23, 'temp_exhaust': 759.54, 'vibration': 3.15, 'pressure_1': 149.81, 'pressure_2': 150.94, 'rpm': 2999}))
('engine_002', (1756558, {'factory_id': 'factory_001', 'engine_id': 'engine_002', 'timestamp': '2025-03-20 15:19:45', 'temp_air': 499.27000000000004, 'temp_oil': 89.24, 'temp_exhaust': 759.39, 'vibration': 2.54, 'pressure_1': 149.04, 'pressure_2': 151.2, 'rpm': 2998}))
('engine_001', (1756559, {'factory_id': 'factory_001', 'engine_id': 'engine_001', 'timestamp': '2025-03-20 15:19:50', 'temp_air': 1000.6199999999999, 'temp_oil': 88.94, 'temp_exhaust': 759.77, 'vibration': 2.78, 'pressure_1': 150.19, 'pressure_2': 149.25, 'rpm': 2999}))
('engine_002', (1756559, {'factory_id': 'factory_001', 'engine_id': 'engine_002', 'timestamp': '2025-03-20 15:19:50', 'temp_air': 1004.8700000000001, 'temp_oil': 88.81, 'temp_exhaust': 759.67, 'vibration': 1.88, 'pressure_1': 149.33, 'pressure_2': 149.04, 'rpm': 3000}))
发布评论

评论列表(0)

  1. 暂无评论