最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Apache beam pipeline not running to completion after applying windowing and a trigger and no errors thrown - Stack Over

programmeradmin0浏览0评论

Hello am in process of creating an apache beam data pipiline that runs in GCP using DataFlow as runner.Below is a my code ,it does not throw any errors but the issue is no data is written into bigquery when I check the Job execution graph there seems to be no activity on the "write to bigquery" section of the pipeline yet I can see activities in data commming from the pubsub activity.I have gone even ahead to add a trigger but still no data is coming out .Please assit .Thanks

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.dataframe.convert import to_dataframe, to_pcollection
from apache_beam import window
import typing
import numpy as np
import pandas as pd


class BmsSchema(typing.NamedTuple):
  can_data_frame_1: typing.Optional[str]


beam.coders.registry.register_coder(BmsSchema, beam.coders.RowCoder)


class ParsePubSubMessage(beam.DoFn):
  def process(self, message):
      import json
      all_columns = [
          "can_data_frame_1"
      ]
      main_dict = dict(zip(all_columns, [None] * len(all_columns)))
      record = json.loads(message.decode('utf-8'))
      main_dict.update(record)
      yield {
          all_columns[0]: main_dict[all_columns[0]]}


def run():
  options = PipelineOptions(
      project='dwingestion',
      runner='DataflowRunner',
      streaming=True,
      temp_location='gs://....../temp',
      staging_location='gs://.........../staging',
      region='europe-west1',
      job_name='.........streaming-pipeline-dataflow',
      save_main_session=True,
      flags=['--allow_unsafe_triggers']
  )

  options.view_as(StandardOptions).streaming = True

  input_subscription = 'projects/..._data_streaming'

  table_schema = {
      "fields": [
          {"name": "current_mA", "type": "INTEGER", "mode": "NULLABLE"}
      ]
  }

  with beam.Pipeline(options=options) as p:
      messages = (p
                  | 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
                  | 'Apply Fixed Window' >> beam.WindowInto(
                      window.FixedWindows(60),
                      trigger=beam.trigger.AfterWatermark(),
                      allowed_lateness=window.Duration(10),
                      accumulation_mode=beam.trigger.AccumulationMode.DISCARDING
                  )
                  | 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
                  | 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
                  )

      # Convert the messages to a DataFrame
      df = to_dataframe(messages)
      

      # Extract and process the 'current_mA' field
      df['current_mA'] = df['can_data_frame_1'].str[4:8].apply(lambda x: int(x, 16) if pd.notna(x) else 0)
      df['current_mA'] = df['current_mA'].where(df['current_mA'] < 0x8000, df['current_mA'] - 0x10000)
      df['current_mA'] = df['current_mA'] * 10

      
      # Convert back to PCollection and map to dictionaries
      transformed_pcol = (
          to_pcollection(df)
          | 'Log Transformed PCollection' >> beam.Map(lambda x: (print(f"Transformed Row: {x}"), x)[1])  # Debugging
          | 'Convert to Dict with Native Types' >> beam.Map(lambda row: {
              "current_mA": int(row.current_mA) if row.current_mA is not None else None
          })
      )

      # Write to BigQuery
      transformed_pcol | 'Write to BigQuery' >> WriteToBigQuery(
          table='..........table_test_all_columns_04',
          write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
          create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
          schema=table_schema,
          custom_gcs_temp_location='gs://......_template/temp'
      )


if __name__ == '__main__':
  run()


与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论