最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python 3.x - Compare data before save to SQL Server - Stack Overflow

programmeradmin5浏览0评论

My project is to create a DAG that extracts data from a SQL Server database, transforms it, and then loads it into the other database.

I have created a connection in Airflow called Ancillary, which connects to the SQL Server database. I have also created a table in the database called Fee. The DAG is supposed to extract data from the database, transform it, and then load it back into the Fee table.

It runs already but the query time takes too long, about 7-10 mins, in extract and load task. I want to optimize and reduce the query time by compare data in CSV file and database before load and extract data to database. If data in CSV file is the same with data in database, it will not load data to database and reverse.

I have tried to compare data in CSV file and database but it broke somewhere.

Below is my code for 2 tasks

Extract task:

class Extract_Data(DatabaseConnection):
    def __init__(self, conn_id):
        super().__init__(conn_id=conn_id)

    def extract(self, query, output_path):
        conn = self.connect()
        df_sql = pd.read_sql(query, conn)
        self.close()

        # Kiểm tra xem file CSV đã tồn tại chưa
        if os.path.exists(output_path):
            df_csv = pd.read_csv(output_path)
            # So sánh dữ liệu mới với dữ liệu trong file CSV
            if not df_sql.equals(df_csv):
                df_sql.to_csv(output_path, index=False)
        else:
            # Nếu file chưa tồn tại, ghi dữ liệu mới
            df_sql.to_csv(output_path, index=False)

Load task:

class Load_Data(DatabaseConnection):
    def __init__(self, conn_string):
        super().__init__(conn_string=conn_string)

    # Đọc dữ liệu 
    def load(self, input_path, table_name):
        df_csv = pd.read_csv(input_path)
        conn = self.connect()
        query= f"""
            SELECT COUNT(*) AS count
            FROM INFORMATION_SCHEMA.TABLES
            WHERE TABLE_NAME = '{table_name}'
        """
        df_sql = pd.read_sql(query, conn)
        tbl_exist = df_sql['count'].iloc[0] > 0

        if not df_sql.equals(df_csv):
        # Lưu vào bảng
            if tbl_exist:
                df_csv.to_sql(table_name, conn, index=False, if_exists='replace')
            else:
                df_csv.to_sql(table_name, conn, index=False, if_exists='fail')
        else:
            None

        self.close()type here

Below is my code which I'm trying to run but it throws an error:

ERROR - Can only compare identically-labeled DataFrame objects

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook # Cần cài thêm mssql provider
from datetime import datetime, timedelta
import pandas as pd # Cần cài thêm pandas
import sqlalchemy as sa # Cần cài thêm sqlalchemy
from airflow.decorators import task, dag
import os

# Cấu hình DAG
default_args = {
    'owner': 'ctt_2',
    'depends_on_past': False,
    'start_date': datetime(2025, 3, 26),
}

# Đường dẫn file cố định
EXTRACT = '/opt/airflow/files/extract_data2.csv'
TRANSFORMED = '/opt/airflow/files/data_transformed2.csv'
TABLE = 'Fee'
EXPORT = '/opt/airflow/files/ancillary_demo_export.xlsx'

# Class quản lý kết nối cơ sở dữ liệu
class DatabaseConnection:
    def __init__(self, conn_id=None, conn_string=None):
        self.conn_id = conn_id
        self.conn_string = conn_string
        self.connection = None

    # Hook tạo kết nối
    def connect(self):
        if self.conn_id:
            hook = MsSqlHook(mssql_conn_id=self.conn_id)
            self.connection = hook.get_conn()
        elif self.conn_string:
            self.connection = sa.create_engine(self.conn_string).connect()
        return self.connection

    #Đóng kết nối
    def close(self):
        if self.connection:
            self.connection.close()

# Class trích xuất dữ liệu
class Extract_Data(DatabaseConnection):
    def __init__(self, conn_id):
        super().__init__(conn_id=conn_id)

    def extract(self, query, output_path):
        conn = self.connect()
        df_sql = pd.read_sql(query, conn)
        self.close()

        # Kiểm tra xem file CSV đã tồn tại chưa
        if os.path.exists(output_path):
            df_csv = pd.read_csv(output_path, low_memory=False)
            df_diff = df_sqlpare(df_csv,align_axis=0, keep_shape=False, keep_equal=False)
            # So sánh dữ liệu mới với dữ liệu trong file CSV
            if not df_sql.equals(df_csv):
                # df_sql.to_csv(output_path, index=False)
                df_diff.to_csv(output_path, index=False)
        else:
            # Nếu file chưa tồn tại, ghi dữ liệu mới
            df_sql.to_csv(output_path, index=False)

# Class biến đổi dữ liệu
class Transform_Data:
    def __init__(self, input_path, output_path):
        self.input_path = input_path
        self.output_path = output_path

    def transform(self):
        df = pd.read_csv(self.input_path)

        # Lưu dữ liệu đã xử lý
        df.to_csv(self.output_path, index=False)

# Class tải dữ liệu
class Load_Data(DatabaseConnection):
    def __init__(self, conn_string):
        super().__init__(conn_string=conn_string)

    # Đọc dữ liệu 
    def load(self, input_path, table_name):
        df_csv = pd.read_csv(input_path,low_memory=False)
        conn = self.connect()
        query= f"""
            SELECT COUNT(*) AS count
            FROM INFORMATION_SCHEMA.TABLES
            WHERE TABLE_NAME = '{table_name}'
        """
        df_sql = pd.read_sql(query, conn)
        tbl_exist = df_sql['count'].iloc[0] > 0
        df_diff = df_csvpare(df_sql,align_axis=0, keep_shape=False, keep_equal=False)
        
        # Lưu vào bảng
        if tbl_exist:
            if not df_sql.equals(df_csv):
                # df_csv.to_sql(table_name, conn, index=False, if_exists='replace')
                df_diff.to_sql(table_name, conn, index=False, if_exists='append')
            else:
                None
        else:
            df_csv.to_sql(table_name, conn, index=False, if_exists='append')


        self.close()


@dag(
    dag_id='Test_Compare',
    default_args=default_args,
    description='Compare Dataframe',
    schedule_interval=timedelta(minutes=8),
    tags=["Compare Dataframe"],
    catchup=False,
)
def ETL():
    @task 
    # Lấy data qua query
    def extract_data():
        extract = Extract_Data(conn_id='Ancill') # Kết nối đến connection Ancillary trên airflow
        query = """
            SELECT a.dtm_Creation_Date, a.lng_GL_Charges_Id_Nmbr, b.lng_Reservation_Nmbr,e.lng_Leg_Route_Id_Nmbr, g.str_GL_Charge_Type_Ident,
            a.str_GL_Charges_Desc,a.mny_GL_Charges_Amount,a.mny_GL_Charges_Discount, a.mny_GL_Charges_Taxes, a.mny_GL_Charges_Total,
            f.str_Currency_Ident, b.mny_Exchange_Rate, a.mny_GL_Currency_Charges_Total,a.mny_GL_Currency_Charges_Total,
            g.str_Refundable, a.str_Refundable_Charge, a.str_Private_Flag, b.lng_Seats,
            b.lng_Agency_Id_Nmbr, c.str_Agency_Name, c.str_IATA_Nmbr
            FROM [dbo].[tbl_GL_Charges] as a
            JOIN [dbo].[tbl_Res_Header] as b on a.lng_Reservation_Nmbr = b.lng_Reservation_Nmbr
            JOIN [dbo].[tbl_Agency] as c on b.lng_Agency_Id_Nmbr = c.lng_Agency_Id_Nmbr
            JOIN [dbo].[tbl_Res_Legs] as d on a.lng_Res_Legs_Id_Nmbr = d.lng_Res_Legs_Id_Nmbr
            JOIN [dbo].[tbl_Leg_Flights] as e on a.lng_Res_Legs_Id_Nmbr = e.lng_Res_Legs_Id_Nmbr
            JOIN [dbo].[tbl_Currency] as f on a.lng_Currency_Id_Nmbr = f.lng_Currency_Id_Nmbr
            JOIN [dbo].[tbl_GL_Charge_Type_Definition] as g on a.lng_GL_Charge_Type_Id_Nmbr = g.lng_GL_Charge_Type_Id_Nmbr
        """
        extract.extract(query, EXTRACT)

    @task
    def transform_data():
        transform= Transform_Data(
            input_path=EXTRACT,
            output_path=TRANSFORMED
        )
        transform.transform()

    @task
    def load_data():
        load = Load_Data(conn_string='mssql+pymssql://abc:[email protected]:51864/TEST')
        load.load(TRANSFORMED, TABLE)


    extract_data() >> transform_data() >> load_data()

dag=ETL()

Is there any optimal choice for my project ? It's about resolve the error, speedup the process and how to compare the data between 2 sources

发布评论

评论列表(0)

  1. 暂无评论