I am running airflow on kubernetes with a Chart.yaml
file:
apiVersion: v2
name: airflow
description: Umbrella chart for Airflow
type: application
version: 0.0.1
appVersion: "2.1.2"
dependencies:
- name: airflow
alias: airflow
version: 8.9.0
repository:
and a values.yaml
file:
airflow:
airflow:
legacyCommands: false
image:
repository: apache/airflow
tag: 2.8.4-python3.9
executor: KubernetesExecutor
fernetKey: "7T512UXSSmBOkpWimFHIVb8jK6lfmSAvx4mO6Arehnc"
webserverSecretKey: "THIS IS UNSAFE!"
config:
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "True"
AIRFLOW__CORE__LOAD_EXAMPLES: "True"
users:
- username: admin
password: admin
role: Admin
email: [email protected]
firstName: admin
lastName: admin
connections: []
variables: []
pools: []
extraPipPackages: []
extraEnv: []
extraVolumeMounts: []
extraVolumes: []
kubernetesPodTemplate:
stringOverride: ""
resources: {}
extraPipPackages: []
extraVolumeMounts: []
extraVolumes: []
scheduler:
replicas: 1
resources: {}
logCleanup:
enabled: true
retentionMinutes: 21600
livenessProbe:
enabled: true
taskCreationCheck:
enabled: false
thresholdSeconds: 300
schedulerAgeBeforeCheck: 180
web:
replicas: 1
resources: {}
service:
type: ClusterIP
externalPort: 8080
webserverConfig:
stringOverride: |
from airflow import configuration as conf
from flask_appbuilder.security.manager import AUTH_DB
# the SQLAlchemy connection string
SQLALCHEMY_DATABASE_URI = conf.get("core", "SQL_ALCHEMY_CONN")
# use embedded DB for auth
AUTH_TYPE = AUTH_DB
existingSecret: ""
workers:
enabled: false
triggerer:
enabled: true
replicas: 1
resources: {}
capacity: 1000
flower:
enabled: false
logs:
path: /opt/airflow/logs
persistence:
enabled: false
dags:
path: /opt/airflow/dags
persistence:
enabled: false
gitSync:
enabled: true
repo: ":[email protected]/MyOrg/MyOrg/_git/Airflow"
branch: "main"
revision: "HEAD"
syncWait: 60
depth: 1
repoSubPath: "dags"
cloneDepth: 1
httpSecret: "airflow-http-git-secret"
httpSecretUsernameKey: username
httpSecretPasswordKey: password
ingress:
enabled: true
web:
host: airflow.mydomain
annotations:
kubernetes.io/ingress.class: alb
alb.ingress.kubernetes.io/group.name: grafana
alb.ingress.kubernetes.io/listen-ports: '[{"HTTP": 80}, {"HTTPS":443}]'
alb.ingress.kubernetes.io/scheme: internet-facing
alb.ingress.kubernetes.io/ssl-redirect: '443'
alb.ingress.kubernetes.io/target-type: ip
serviceAccount:
create: true
name: ""
annotations: {}
extraManifests: []
pgbouncer:
enabled: true
resources: {}
authType: md5
postgresql:
enabled: true
persistence:
enabled: true
storagClass: ""
size: 8Gi
externalDatabase:
type: postgres
redis:
enabled: false
externalRedis:
host: localhost
I then tried to run a job which had parallel tasks:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import random
import time
def heavy_computation(task_number):
"""Simulates a computationally heavy task."""
sleep_time = random.uniform(0, 1) # Simulate varying computation times
time.sleep(sleep_time)
print(f"Task {task_number} completed after {sleep_time:.2f} seconds")
# Define default args
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 3, 21),
'retries': 0,
}
# Define the DAG
with DAG(
'parallel_computation_dag',
default_args=default_args,
schedule_interval=None, # Manual trigger
catchup=False,
max_active_tasks=10, # Allow multiple tasks to run in parallel
) as dag:
tasks = [
PythonOperator(
task_id=f'heavy_task_{i}',
python_callable=heavy_computation,
op_kwargs={'task_number': i},
) for i in range(20) # Creates 20 parallel tasks
]
However, only a single pod would run at a time, so the jobs would not run in parallel. Is there a way to change the config to allow multiple pods to run at the same time?
Thanks.
I am running airflow on kubernetes with a Chart.yaml
file:
apiVersion: v2
name: airflow
description: Umbrella chart for Airflow
type: application
version: 0.0.1
appVersion: "2.1.2"
dependencies:
- name: airflow
alias: airflow
version: 8.9.0
repository: https://airflow-helm.github.io/charts
and a values.yaml
file:
airflow:
airflow:
legacyCommands: false
image:
repository: apache/airflow
tag: 2.8.4-python3.9
executor: KubernetesExecutor
fernetKey: "7T512UXSSmBOkpWimFHIVb8jK6lfmSAvx4mO6Arehnc"
webserverSecretKey: "THIS IS UNSAFE!"
config:
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "True"
AIRFLOW__CORE__LOAD_EXAMPLES: "True"
users:
- username: admin
password: admin
role: Admin
email: [email protected]
firstName: admin
lastName: admin
connections: []
variables: []
pools: []
extraPipPackages: []
extraEnv: []
extraVolumeMounts: []
extraVolumes: []
kubernetesPodTemplate:
stringOverride: ""
resources: {}
extraPipPackages: []
extraVolumeMounts: []
extraVolumes: []
scheduler:
replicas: 1
resources: {}
logCleanup:
enabled: true
retentionMinutes: 21600
livenessProbe:
enabled: true
taskCreationCheck:
enabled: false
thresholdSeconds: 300
schedulerAgeBeforeCheck: 180
web:
replicas: 1
resources: {}
service:
type: ClusterIP
externalPort: 8080
webserverConfig:
stringOverride: |
from airflow import configuration as conf
from flask_appbuilder.security.manager import AUTH_DB
# the SQLAlchemy connection string
SQLALCHEMY_DATABASE_URI = conf.get("core", "SQL_ALCHEMY_CONN")
# use embedded DB for auth
AUTH_TYPE = AUTH_DB
existingSecret: ""
workers:
enabled: false
triggerer:
enabled: true
replicas: 1
resources: {}
capacity: 1000
flower:
enabled: false
logs:
path: /opt/airflow/logs
persistence:
enabled: false
dags:
path: /opt/airflow/dags
persistence:
enabled: false
gitSync:
enabled: true
repo: "https://tom.mclean:[email protected]/MyOrg/MyOrg/_git/Airflow"
branch: "main"
revision: "HEAD"
syncWait: 60
depth: 1
repoSubPath: "dags"
cloneDepth: 1
httpSecret: "airflow-http-git-secret"
httpSecretUsernameKey: username
httpSecretPasswordKey: password
ingress:
enabled: true
web:
host: airflow.mydomain
annotations:
kubernetes.io/ingress.class: alb
alb.ingress.kubernetes.io/group.name: grafana
alb.ingress.kubernetes.io/listen-ports: '[{"HTTP": 80}, {"HTTPS":443}]'
alb.ingress.kubernetes.io/scheme: internet-facing
alb.ingress.kubernetes.io/ssl-redirect: '443'
alb.ingress.kubernetes.io/target-type: ip
serviceAccount:
create: true
name: ""
annotations: {}
extraManifests: []
pgbouncer:
enabled: true
resources: {}
authType: md5
postgresql:
enabled: true
persistence:
enabled: true
storagClass: ""
size: 8Gi
externalDatabase:
type: postgres
redis:
enabled: false
externalRedis:
host: localhost
I then tried to run a job which had parallel tasks:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import random
import time
def heavy_computation(task_number):
"""Simulates a computationally heavy task."""
sleep_time = random.uniform(0, 1) # Simulate varying computation times
time.sleep(sleep_time)
print(f"Task {task_number} completed after {sleep_time:.2f} seconds")
# Define default args
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 3, 21),
'retries': 0,
}
# Define the DAG
with DAG(
'parallel_computation_dag',
default_args=default_args,
schedule_interval=None, # Manual trigger
catchup=False,
max_active_tasks=10, # Allow multiple tasks to run in parallel
) as dag:
tasks = [
PythonOperator(
task_id=f'heavy_task_{i}',
python_callable=heavy_computation,
op_kwargs={'task_number': i},
) for i in range(20) # Creates 20 parallel tasks
]
However, only a single pod would run at a time, so the jobs would not run in parallel. Is there a way to change the config to allow multiple pods to run at the same time?
Thanks.
Share Improve this question asked Mar 21 at 22:48 Tom McLeanTom McLean 6,3611 gold badge21 silver badges51 bronze badges 1- Where are you running this. Have you tried to check the number of free resources you have of your nodes or checking the scheduler logs? – Lucas M. Uriarte Commented Mar 25 at 10:31
1 Answer
Reset to default 1I suppose you use this helm chart? If so have a look at the Parameter Reference. There are two specific values that set the replica count:
- scheduler.replicas: Airflow 2.0 allows users to run multiple schedulers. This feature is only recommended for MySQL 8+ and PostgreSQL
- workers.replicas: Number of Airflow Celery workers in StatefulSet.
As you want to run jobs in parallel, enabling workers and setting replica
to >=2 deploys two workers so jobs can run in parallel.
Updated values.yaml
:
airflow:
workers:
enabled: true
replica: 2