I want to aggregate the values of my DataStream in tumbling windows of 10 seconds.
Unfortunately is the documentation in Bytewax very limited and I also don't find any other source where an average of the data is performed.
I have the following script and my reduce_window
function is adding up the values, but all my attempts to divide to get the average failed..
import json
import time
from datetime import datetime, timedelta, timezone
from bytewax.connectors.kafka import KafkaSource
from bytewax import operators as op
from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutSink
from bytewax.operators.windowing import (
EventClock,
TumblingWindower,
reduce_window,
)
time.sleep(10)
# Define the Bytewax dataflow
flow = Dataflow("Exmaple-Flow")
# Kafka Source (consume messages from topic)
kafka_source = KafkaSource(
brokers=["kafka:9093"],
topics=["factory_001"],
)
def extract_value(msg):
"""Extract JSON data from KafkaSourceMessage."""
try:
# Decode byte string to a normal string
message_str = msg.value.decode("utf-8")
# Convert JSON string to Python dictionary
message_dict = json.loads(json.loads(message_str))
return message_dict
except Exception as e:
print(f"Error parsing Kafka message: {e}")
return None # Return None if there's an error
def extract_timestamp(msg):
"""Extract and convert Kafka timestamp"""
return datetime.strptime(msg["timestamp"], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
kinp = op.input("kafka-in", flow, kafka_source)
mapped = op.map("extract_string", kinp, lambda x: extract_value(x))
keyed_stream = op.key_on("key_on_engine_id", mapped, lambda e: e["engine_id"])
clock = EventClock(
ts_getter=extract_timestamp,
wait_for_system_duration=timedelta(seconds=10)
)
windower = TumblingWindower(
length=timedelta(seconds=10),
align_to=datetime(2024, 8, 29, 8, 0, 0, tzinfo=timezone.utc)
)
def add(acc, x):
acc["temp_air"] += x["temp_air"]
return acc
windowed_avg = reduce_window(
step_id="average_temp_air",
up=keyed_stream,
clock=clock,
windower=windower,
reducer=add
)
op.output("out", windowed_avg.down, StdOutSink())
Stream example returned by the output:
('engine_001', (1756558, {'factory_id': 'factory_001', 'engine_id': 'engine_001', 'timestamp': '2025-03-20 15:19:45', 'temp_air': 499.29999999999995, 'temp_oil': 89.23, 'temp_exhaust': 759.54, 'vibration': 3.15, 'pressure_1': 149.81, 'pressure_2': 150.94, 'rpm': 2999}))
('engine_002', (1756558, {'factory_id': 'factory_001', 'engine_id': 'engine_002', 'timestamp': '2025-03-20 15:19:45', 'temp_air': 499.27000000000004, 'temp_oil': 89.24, 'temp_exhaust': 759.39, 'vibration': 2.54, 'pressure_1': 149.04, 'pressure_2': 151.2, 'rpm': 2998}))
('engine_001', (1756559, {'factory_id': 'factory_001', 'engine_id': 'engine_001', 'timestamp': '2025-03-20 15:19:50', 'temp_air': 1000.6199999999999, 'temp_oil': 88.94, 'temp_exhaust': 759.77, 'vibration': 2.78, 'pressure_1': 150.19, 'pressure_2': 149.25, 'rpm': 2999}))
('engine_002', (1756559, {'factory_id': 'factory_001', 'engine_id': 'engine_002', 'timestamp': '2025-03-20 15:19:50', 'temp_air': 1004.8700000000001, 'temp_oil': 88.81, 'temp_exhaust': 759.67, 'vibration': 1.88, 'pressure_1': 149.33, 'pressure_2': 149.04, 'rpm': 3000}))