My project is to create a DAG that extracts data from a SQL Server database, transforms it, and then loads it into the other database.
I have created a connection in Airflow called Ancillary, which connects to the SQL Server database. I have also created a table in the database called Fee. The DAG is supposed to extract data from the database, transform it, and then load it back into the Fee table.
It runs already but the query time takes too long, about 7-10 mins, in extract and load task. I want to optimize and reduce the query time by compare data in CSV file and database before load and extract data to database. If data in CSV file is the same with data in database, it will not load data to database and reverse.
I have tried to compare data in CSV file and database but it broke somewhere.
Below is my code for 2 tasks
Extract task:
class Extract_Data(DatabaseConnection):
def __init__(self, conn_id):
super().__init__(conn_id=conn_id)
def extract(self, query, output_path):
conn = self.connect()
df_sql = pd.read_sql(query, conn)
self.close()
# Kiểm tra xem file CSV đã tồn tại chưa
if os.path.exists(output_path):
df_csv = pd.read_csv(output_path)
# So sánh dữ liệu mới với dữ liệu trong file CSV
if not df_sql.equals(df_csv):
df_sql.to_csv(output_path, index=False)
else:
# Nếu file chưa tồn tại, ghi dữ liệu mới
df_sql.to_csv(output_path, index=False)
Load task:
class Load_Data(DatabaseConnection):
def __init__(self, conn_string):
super().__init__(conn_string=conn_string)
# Đọc dữ liệu
def load(self, input_path, table_name):
df_csv = pd.read_csv(input_path)
conn = self.connect()
query= f"""
SELECT COUNT(*) AS count
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME = '{table_name}'
"""
df_sql = pd.read_sql(query, conn)
tbl_exist = df_sql['count'].iloc[0] > 0
if not df_sql.equals(df_csv):
# Lưu vào bảng
if tbl_exist:
df_csv.to_sql(table_name, conn, index=False, if_exists='replace')
else:
df_csv.to_sql(table_name, conn, index=False, if_exists='fail')
else:
None
self.close()type here
Below is my code which I'm trying to run but it throws an error:
ERROR - Can only compare identically-labeled DataFrame objects
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook # Cần cài thêm mssql provider
from datetime import datetime, timedelta
import pandas as pd # Cần cài thêm pandas
import sqlalchemy as sa # Cần cài thêm sqlalchemy
from airflow.decorators import task, dag
import os
# Cấu hình DAG
default_args = {
'owner': 'ctt_2',
'depends_on_past': False,
'start_date': datetime(2025, 3, 26),
}
# Đường dẫn file cố định
EXTRACT = '/opt/airflow/files/extract_data2.csv'
TRANSFORMED = '/opt/airflow/files/data_transformed2.csv'
TABLE = 'Fee'
EXPORT = '/opt/airflow/files/ancillary_demo_export.xlsx'
# Class quản lý kết nối cơ sở dữ liệu
class DatabaseConnection:
def __init__(self, conn_id=None, conn_string=None):
self.conn_id = conn_id
self.conn_string = conn_string
self.connection = None
# Hook tạo kết nối
def connect(self):
if self.conn_id:
hook = MsSqlHook(mssql_conn_id=self.conn_id)
self.connection = hook.get_conn()
elif self.conn_string:
self.connection = sa.create_engine(self.conn_string).connect()
return self.connection
#Đóng kết nối
def close(self):
if self.connection:
self.connection.close()
# Class trích xuất dữ liệu
class Extract_Data(DatabaseConnection):
def __init__(self, conn_id):
super().__init__(conn_id=conn_id)
def extract(self, query, output_path):
conn = self.connect()
df_sql = pd.read_sql(query, conn)
self.close()
# Kiểm tra xem file CSV đã tồn tại chưa
if os.path.exists(output_path):
df_csv = pd.read_csv(output_path, low_memory=False)
df_diff = df_sqlpare(df_csv,align_axis=0, keep_shape=False, keep_equal=False)
# So sánh dữ liệu mới với dữ liệu trong file CSV
if not df_sql.equals(df_csv):
# df_sql.to_csv(output_path, index=False)
df_diff.to_csv(output_path, index=False)
else:
# Nếu file chưa tồn tại, ghi dữ liệu mới
df_sql.to_csv(output_path, index=False)
# Class biến đổi dữ liệu
class Transform_Data:
def __init__(self, input_path, output_path):
self.input_path = input_path
self.output_path = output_path
def transform(self):
df = pd.read_csv(self.input_path)
# Lưu dữ liệu đã xử lý
df.to_csv(self.output_path, index=False)
# Class tải dữ liệu
class Load_Data(DatabaseConnection):
def __init__(self, conn_string):
super().__init__(conn_string=conn_string)
# Đọc dữ liệu
def load(self, input_path, table_name):
df_csv = pd.read_csv(input_path,low_memory=False)
conn = self.connect()
query= f"""
SELECT COUNT(*) AS count
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME = '{table_name}'
"""
df_sql = pd.read_sql(query, conn)
tbl_exist = df_sql['count'].iloc[0] > 0
df_diff = df_csvpare(df_sql,align_axis=0, keep_shape=False, keep_equal=False)
# Lưu vào bảng
if tbl_exist:
if not df_sql.equals(df_csv):
# df_csv.to_sql(table_name, conn, index=False, if_exists='replace')
df_diff.to_sql(table_name, conn, index=False, if_exists='append')
else:
None
else:
df_csv.to_sql(table_name, conn, index=False, if_exists='append')
self.close()
@dag(
dag_id='Test_Compare',
default_args=default_args,
description='Compare Dataframe',
schedule_interval=timedelta(minutes=8),
tags=["Compare Dataframe"],
catchup=False,
)
def ETL():
@task
# Lấy data qua query
def extract_data():
extract = Extract_Data(conn_id='Ancill') # Kết nối đến connection Ancillary trên airflow
query = """
SELECT a.dtm_Creation_Date, a.lng_GL_Charges_Id_Nmbr, b.lng_Reservation_Nmbr,e.lng_Leg_Route_Id_Nmbr, g.str_GL_Charge_Type_Ident,
a.str_GL_Charges_Desc,a.mny_GL_Charges_Amount,a.mny_GL_Charges_Discount, a.mny_GL_Charges_Taxes, a.mny_GL_Charges_Total,
f.str_Currency_Ident, b.mny_Exchange_Rate, a.mny_GL_Currency_Charges_Total,a.mny_GL_Currency_Charges_Total,
g.str_Refundable, a.str_Refundable_Charge, a.str_Private_Flag, b.lng_Seats,
b.lng_Agency_Id_Nmbr, c.str_Agency_Name, c.str_IATA_Nmbr
FROM [dbo].[tbl_GL_Charges] as a
JOIN [dbo].[tbl_Res_Header] as b on a.lng_Reservation_Nmbr = b.lng_Reservation_Nmbr
JOIN [dbo].[tbl_Agency] as c on b.lng_Agency_Id_Nmbr = c.lng_Agency_Id_Nmbr
JOIN [dbo].[tbl_Res_Legs] as d on a.lng_Res_Legs_Id_Nmbr = d.lng_Res_Legs_Id_Nmbr
JOIN [dbo].[tbl_Leg_Flights] as e on a.lng_Res_Legs_Id_Nmbr = e.lng_Res_Legs_Id_Nmbr
JOIN [dbo].[tbl_Currency] as f on a.lng_Currency_Id_Nmbr = f.lng_Currency_Id_Nmbr
JOIN [dbo].[tbl_GL_Charge_Type_Definition] as g on a.lng_GL_Charge_Type_Id_Nmbr = g.lng_GL_Charge_Type_Id_Nmbr
"""
extract.extract(query, EXTRACT)
@task
def transform_data():
transform= Transform_Data(
input_path=EXTRACT,
output_path=TRANSFORMED
)
transform.transform()
@task
def load_data():
load = Load_Data(conn_string='mssql+pymssql://abc:[email protected]:51864/TEST')
load.load(TRANSFORMED, TABLE)
extract_data() >> transform_data() >> load_data()
dag=ETL()
Is there any optimal choice for my project ? It's about resolve the error, speedup the process and how to compare the data between 2 sources