最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Airflow on Kubernetes with KubernetesExecutor only running one pod at a time - Stack Overflow

programmeradmin4浏览0评论

I am running airflow on kubernetes with a Chart.yaml file:

apiVersion: v2
name: airflow
description: Umbrella chart for Airflow
type: application
version: 0.0.1
appVersion: "2.1.2"
dependencies:
  - name: airflow
    alias: airflow
    version: 8.9.0
    repository: 

and a values.yaml file:

airflow:
  airflow:
    legacyCommands: false
    image:
      repository: apache/airflow
      tag: 2.8.4-python3.9
    executor: KubernetesExecutor
    fernetKey: "7T512UXSSmBOkpWimFHIVb8jK6lfmSAvx4mO6Arehnc"
    webserverSecretKey: "THIS IS UNSAFE!"
    config:
      AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "True"
      AIRFLOW__CORE__LOAD_EXAMPLES: "True"
    users:
      - username: admin
        password: admin
        role: Admin
        email: [email protected]
        firstName: admin
        lastName: admin
    connections: []
    variables: []
    pools: []
    extraPipPackages: []
    extraEnv: []
    extraVolumeMounts: []
    extraVolumes: []
    kubernetesPodTemplate:
      stringOverride: ""
      resources: {}
      extraPipPackages: []
      extraVolumeMounts: []
      extraVolumes: []
  scheduler:
    replicas: 1
    resources: {}
    logCleanup:
      enabled: true
      retentionMinutes: 21600
    livenessProbe:
      enabled: true
    taskCreationCheck:
      enabled: false
      thresholdSeconds: 300
      schedulerAgeBeforeCheck: 180
  web:
    replicas: 1
    resources: {}
    service:
      type: ClusterIP
      externalPort: 8080
    webserverConfig:
      stringOverride: |
        from airflow import configuration as conf
        from flask_appbuilder.security.manager import AUTH_DB
        
        # the SQLAlchemy connection string
        SQLALCHEMY_DATABASE_URI = conf.get("core", "SQL_ALCHEMY_CONN")
        
        # use embedded DB for auth
        AUTH_TYPE = AUTH_DB
      existingSecret: ""
  
  workers:
    enabled: false

  triggerer:
    enabled: true
    replicas: 1
    resources: {}
    capacity: 1000
  
  flower:
    enabled: false
  
  logs:
    path: /opt/airflow/logs
    persistence:
      enabled: false
  
  dags:
    path: /opt/airflow/dags
    persistence:
      enabled: false
    gitSync:
      enabled: true
      repo: ":[email protected]/MyOrg/MyOrg/_git/Airflow"
      branch: "main"
      revision: "HEAD"
      syncWait: 60
      depth: 1
      repoSubPath: "dags"
      cloneDepth: 1

      httpSecret: "airflow-http-git-secret"
      httpSecretUsernameKey: username
      httpSecretPasswordKey: password

  ingress:
    enabled: true
      
    web:
      host: airflow.mydomain
      annotations:
        kubernetes.io/ingress.class: alb
        alb.ingress.kubernetes.io/group.name: grafana
        alb.ingress.kubernetes.io/listen-ports: '[{"HTTP": 80}, {"HTTPS":443}]'
        alb.ingress.kubernetes.io/scheme: internet-facing
        alb.ingress.kubernetes.io/ssl-redirect: '443'
        alb.ingress.kubernetes.io/target-type: ip
  
  serviceAccount:
    create: true
    name: ""
    annotations: {}
  
  extraManifests: []

  pgbouncer:
    enabled: true
    resources: {}
    authType: md5
  
  postgresql:
    enabled: true
    persistence:
      enabled: true
      storagClass: ""
      size: 8Gi
  
  externalDatabase:
    type: postgres
  
  redis:
    enabled: false
  
  externalRedis:
    host: localhost

I then tried to run a job which had parallel tasks:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import random
import time

def heavy_computation(task_number):
    """Simulates a computationally heavy task."""
    sleep_time = random.uniform(0, 1)  # Simulate varying computation times
    time.sleep(sleep_time)
    print(f"Task {task_number} completed after {sleep_time:.2f} seconds")

# Define default args
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 3, 21),
    'retries': 0,
}

# Define the DAG
with DAG(
    'parallel_computation_dag',
    default_args=default_args,
    schedule_interval=None,  # Manual trigger
    catchup=False,
    max_active_tasks=10,  # Allow multiple tasks to run in parallel
) as dag:
    
    tasks = [
        PythonOperator(
            task_id=f'heavy_task_{i}',
            python_callable=heavy_computation,
            op_kwargs={'task_number': i},
        ) for i in range(20)  # Creates 20 parallel tasks
    ]

However, only a single pod would run at a time, so the jobs would not run in parallel. Is there a way to change the config to allow multiple pods to run at the same time?

Thanks.

I am running airflow on kubernetes with a Chart.yaml file:

apiVersion: v2
name: airflow
description: Umbrella chart for Airflow
type: application
version: 0.0.1
appVersion: "2.1.2"
dependencies:
  - name: airflow
    alias: airflow
    version: 8.9.0
    repository: https://airflow-helm.github.io/charts

and a values.yaml file:

airflow:
  airflow:
    legacyCommands: false
    image:
      repository: apache/airflow
      tag: 2.8.4-python3.9
    executor: KubernetesExecutor
    fernetKey: "7T512UXSSmBOkpWimFHIVb8jK6lfmSAvx4mO6Arehnc"
    webserverSecretKey: "THIS IS UNSAFE!"
    config:
      AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "True"
      AIRFLOW__CORE__LOAD_EXAMPLES: "True"
    users:
      - username: admin
        password: admin
        role: Admin
        email: [email protected]
        firstName: admin
        lastName: admin
    connections: []
    variables: []
    pools: []
    extraPipPackages: []
    extraEnv: []
    extraVolumeMounts: []
    extraVolumes: []
    kubernetesPodTemplate:
      stringOverride: ""
      resources: {}
      extraPipPackages: []
      extraVolumeMounts: []
      extraVolumes: []
  scheduler:
    replicas: 1
    resources: {}
    logCleanup:
      enabled: true
      retentionMinutes: 21600
    livenessProbe:
      enabled: true
    taskCreationCheck:
      enabled: false
      thresholdSeconds: 300
      schedulerAgeBeforeCheck: 180
  web:
    replicas: 1
    resources: {}
    service:
      type: ClusterIP
      externalPort: 8080
    webserverConfig:
      stringOverride: |
        from airflow import configuration as conf
        from flask_appbuilder.security.manager import AUTH_DB
        
        # the SQLAlchemy connection string
        SQLALCHEMY_DATABASE_URI = conf.get("core", "SQL_ALCHEMY_CONN")
        
        # use embedded DB for auth
        AUTH_TYPE = AUTH_DB
      existingSecret: ""
  
  workers:
    enabled: false

  triggerer:
    enabled: true
    replicas: 1
    resources: {}
    capacity: 1000
  
  flower:
    enabled: false
  
  logs:
    path: /opt/airflow/logs
    persistence:
      enabled: false
  
  dags:
    path: /opt/airflow/dags
    persistence:
      enabled: false
    gitSync:
      enabled: true
      repo: "https://tom.mclean:[email protected]/MyOrg/MyOrg/_git/Airflow"
      branch: "main"
      revision: "HEAD"
      syncWait: 60
      depth: 1
      repoSubPath: "dags"
      cloneDepth: 1

      httpSecret: "airflow-http-git-secret"
      httpSecretUsernameKey: username
      httpSecretPasswordKey: password

  ingress:
    enabled: true
      
    web:
      host: airflow.mydomain
      annotations:
        kubernetes.io/ingress.class: alb
        alb.ingress.kubernetes.io/group.name: grafana
        alb.ingress.kubernetes.io/listen-ports: '[{"HTTP": 80}, {"HTTPS":443}]'
        alb.ingress.kubernetes.io/scheme: internet-facing
        alb.ingress.kubernetes.io/ssl-redirect: '443'
        alb.ingress.kubernetes.io/target-type: ip
  
  serviceAccount:
    create: true
    name: ""
    annotations: {}
  
  extraManifests: []

  pgbouncer:
    enabled: true
    resources: {}
    authType: md5
  
  postgresql:
    enabled: true
    persistence:
      enabled: true
      storagClass: ""
      size: 8Gi
  
  externalDatabase:
    type: postgres
  
  redis:
    enabled: false
  
  externalRedis:
    host: localhost

I then tried to run a job which had parallel tasks:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import random
import time

def heavy_computation(task_number):
    """Simulates a computationally heavy task."""
    sleep_time = random.uniform(0, 1)  # Simulate varying computation times
    time.sleep(sleep_time)
    print(f"Task {task_number} completed after {sleep_time:.2f} seconds")

# Define default args
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 3, 21),
    'retries': 0,
}

# Define the DAG
with DAG(
    'parallel_computation_dag',
    default_args=default_args,
    schedule_interval=None,  # Manual trigger
    catchup=False,
    max_active_tasks=10,  # Allow multiple tasks to run in parallel
) as dag:
    
    tasks = [
        PythonOperator(
            task_id=f'heavy_task_{i}',
            python_callable=heavy_computation,
            op_kwargs={'task_number': i},
        ) for i in range(20)  # Creates 20 parallel tasks
    ]

However, only a single pod would run at a time, so the jobs would not run in parallel. Is there a way to change the config to allow multiple pods to run at the same time?

Thanks.

Share Improve this question asked Mar 21 at 22:48 Tom McLeanTom McLean 6,3611 gold badge21 silver badges51 bronze badges 1
  • Where are you running this. Have you tried to check the number of free resources you have of your nodes or checking the scheduler logs? – Lucas M. Uriarte Commented Mar 25 at 10:31
Add a comment  | 

1 Answer 1

Reset to default 1

I suppose you use this helm chart? If so have a look at the Parameter Reference. There are two specific values that set the replica count:

  • scheduler.replicas: Airflow 2.0 allows users to run multiple schedulers. This feature is only recommended for MySQL 8+ and PostgreSQL
  • workers.replicas: Number of Airflow Celery workers in StatefulSet.

As you want to run jobs in parallel, enabling workers and setting replica to >=2 deploys two workers so jobs can run in parallel.

Updated values.yaml:

airflow:
  workers:
    enabled: true
    replica: 2
发布评论

评论列表(0)

  1. 暂无评论