Work/개발 노트

[Kubernetes] Airflow on EKS의 DAG 파일 관리

★용호★ 2023. 11. 26. 16:13

Airflow 사용 준비

Airflow on EKS를 사용하기 위해 Helm을 사용하여 Kubernetes 클러스터에 Arflow를 배포합니다. install 명령을 실행하기 전에 먼저 Kubernetes 네임스페이스를 생성한 후 필요한 파일들을 만들어보겠습니다.
kubectl create ns airflow
 
Airflow에 사용되는 DAG 파일들은 Git, S3, EFS 등 다양한 저장소와 연동해서 사용할 수 있습니다. 각 저장소 정보는 helm의 values.yaml에 설정합니다. 먼저 템플릿으로 사용할 values.yaml 파일을 생성합니다.

 

helm repo add apache-airflow https://airflow.apache.org

helm show values apache-airflow/airflow > values.yaml
 

Git을 사용하는 방법

Git의 Private repository에저장된 DAG 파일을 pull 받을 수 있도록 SSH Key를 사용합니다. 먼저 SSH Key를 생성합니다.
 
ssh-keygen
 
 

 

생성한 Private 키를 Airflow Pod에서 참조할 수 있도록 Kubernetes Secret으로 등록합니다.
 
kubectl create secret generic airflow-ssh-git-secret \
--from-file=gitSshKey=/home/ec2-user/.ssh/id_rsa \
-n airflow
 
 
Git 저장소에는 해당 SSH Key로 접근할 수 있도록 SSH Public 키를 등록합니다. (참고)
키 설정이 완료되었으면 Git 연결 정보를 Helm의 values.yaml 파일에 설정합니다. 앞서 생성한 values.yaml 파일에 GitSync 항목을 찾아 아래와 같이 수정합니다.
dags:
  gitSync:
    enabled: true
    repo: git@github.com:YonghoChoi/airflow-dag.git
    branch: main
    rev: HEAD
    depth: 1
    maxFailures: 0
    subPath: ""
    credentialsSecret: git-credentials
    sshKeySecret: airflow-ssh-git-secret
    wait : 10
    containerName: git-sync
    uid: 65533

 

EFS를 사용하는 방법

먼저 EFS 파일 시스템을 생성합니다. 여기서는 AWS 콘솔을 사용합니다.

 

Amazon EKS 클러스터와 동일한 VPC를 선택한 후 [사용자 지정 버튼]을 클릭합니다. EFS는 Network File System으로 접근 할 대상 EC2 인스턴스에서 네트워크로 접속할 수 있도록 Security Group을 허용해주어야 합니다. 즉, Amazon EKS의 워커 노드들의 Security Group을 EFS의 탑재 대상에 포함시켜 주어야 파일 시스템에 접근이 가능합니다.
이제 EKS 내에서 EFS를 PersistantVolume으로 연동하기 위해 EFS CSI 드라이버를 설치합니다. 그 전에 Kubernetes에 배포된 Pod에서 EFS를 제어할 수 있도록 권한이 부여된 IAM Role을 생성하여 Service Account를 추가합니다.
CLUSTER_NAME=eks-demo-1
REGION=ap-northeast-1
POLICY_NAME=EKS_EFS_CSI_Driver_Policy
ROLE_NAME=EKS_EFS_CSI_Driver_Role

curl -O https://raw.githubusercontent.com/kubernetes-sigs/aws-efs-csi-driver/master/docs/iam-policy-example.json
aws iam create-policy \
    --policy-name $POLICY_NAME \
    --policy-document file://iam-policy-example.json \
    --no-paginate

POLICY_ARN=$(aws iam list-policies --query "Policies[?PolicyName==\`$POLICY_NAME\`].Arn" --output text)

eksctl create iamserviceaccount \
    --cluster $CLUSTER_NAME \
    --namespace kube-system \
    --name efs-csi-controller-sa \
    --attach-policy-arn $POLICY_ARN \
    --approve \
    --region $REGION
 
EFS CSI 드라이버는 AWS 콘솔의 [EKS 페이지] -> [클러스터 선택] -> [추가 기능] 탭에서 손쉽게 설치할 수 있습니다.
EFS CSI 드라이버 설치가 완료되었다면 이제 Kubernetes 클러스터에 Persistant Volume으로 EFS를 생성할 수 있습니다. EFS를 사용하기위해 정적 프로비저닝과 동적 프로비저닝 방식을 사용할 수 있습니다. 여기서는 동적 프로비저닝 방식을 사용합니다.
먼저 StorageClass를 생성합니다. EFS_ID에 해당하는 부분은 Amazon EFS 콘솔의 파일 시스템 리스트에서 확인할 수 있습니다.
 
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: efs-sc
provisioner: efs.csi.aws.com
parameters:
  provisioningMode: efs-ap
  fileSystemId: $(EFS_ID)
  directoryPerms: "700"
매니페스트 파일을 작성하여 PersistantVolume과 PersistantVolumeClaim을 생성합니다.
이제 PersistantVolumeClaim을 생성하여 EFS의 마운트 옵션과 스토리지 사용량에 대한 설정을 합니다. 이 후 Pod에서 해당 정보를 기반으로 EFS를 마운트합니다.
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: efs-claim
  namespace: airflow
spec:
  accessModes:
    - ReadWriteMany
  storageClassName: efs-sc
  resources:
    requests:
      storage: 10Gi
 
이제 Helm values 파일을 열고, dags 필드 하위에 아래 내용을 추가합니다.
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
    accessMode: ReadWriteMany
    size: 10Gi
 
설정이 완료되었으면 아래 명령을 사용하여 Airflow를 배포합니다.
export AIRFLOW_NAME="airflow"
export AIRFLOW_NAMESPACE="airflow"

## install using helm 3
helm upgrade --install \
  "$AIRFLOW_NAME" \
  apache-airflow/airflow \
  --namespace "$AIRFLOW_NAMESPACE" \
  --create-namespace \
  --values ./values.yaml
 

S3를 사용하는 방법

기본적으로 Airflow Helm Chart 설정에는 S3 버킷을 Sync하는 설정이 존재하지 않습니다. 여기서는 Dag 파일을 관리하는 Airflow Scheduler에서 지속적으로 S3 버킷을 Sync할 수 있도록 Sidecar 컨테이너를 실행하는 방식을 사용합니다.

출저 : https://swalloow.github.io/airflow-sidecar/

 

위 그림은 참고용이며 이어지는 설명에서는 Scheduler가 EFS PV를 사용하는 것이 아닌 Pod 내에서 Sidecar 컨테이너와 emtydir 볼륨을 사용하여 데이터를 공유합니다.
이를 위해서는Scheduler Pod에서 S3 데이터를 다운로드 받을 수 있어야하므로 IRSA(IAM Role for Service Account)를 사용하여 S3 권한을 부여해야 합니다. 진행에 앞서 편의를 위해 필요한 변수 값들을 환경변수로 등록하도록 하겠습니다.
ACCOUNT_ID=$(curl -s 169.254.169.254/latest/dynamic/instance-identity/document | jq -r '.accountId')
AWS_REGION=$(curl -s 169.254.169.254/latest/dynamic/instance-identity/document | jq -r '.region')
CLUSTER_NAME=EKS클러스터명
S3_DAG_BUCKET=Dag을 저장할 S3 버킷명

echo "export ACCOUNT_ID=${ACCOUNT_ID}" | tee -a ~/.bash_profile
echo "export AWS_REGION=${AWS_REGION}" | tee -a ~/.bash_profile
 
권한을 할당 받을 Service Account는 IAM OIDC provider로부터 AssumeRole을 통해 자격증명을 받게 되므로 아래 명령을 실행하여 IAM OIDC provider를 설정합니다.
eksctl utils associate-iam-oidc-provider \
    --region ${AWS_REGION} \
    --cluster ${CLUSTER_NAME} \
    --approve
 
Service Account에 할당할 S3 접근 권한을 포함한 IAM Policy 파일을 작성 후 생성합니다.
cat << EOF > s3-policy.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": "arn:aws:s3:::$S3_DAG_BUCKET"
        },
        {
            "Sid": "List",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetObjectVersion"
            ],
            "Resource": "arn:aws:s3:::$S3_DAG_BUCKET/*"
        }
    ]
}
EOF

aws iam create-policy \
    --policy-name S3ReadonlyForAirflowDags \
    --policy-document file://s3-policy.json

 

eksctl 도구를 사용하여 해당 Policy를 attach한 IAM Role을 생성하면서 Kubernetes 내 Service Account를 생성합니다.
eksctl create iamserviceaccount \
    --name airflow-scheduler \
    --namespace airflow \
    --cluster $CLUSTER_NAME \
    --role-name S3BucketForAirflow \
    --attach-policy-arn arn:aws:iam::$ACCOUNT_ID:policy/S3ReadonlyForAirflowDags  \
    --override-existing-serviceaccounts --approve
 
이미 scheduler를 위한 ServiceAccount를 생성했기 때문에 helm 릴리즈 적용 시 별도로 생성되지 않도록 values.yaml 파일을 수정합니다.
... 생략 ...
scheduler:
  ... 생략 ...
  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: false
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~

    # Annotations to add to scheduler kubernetes service account.
    annotations: {}
 
여기까지 문제 없이 진행되었다면 이제 Airflow의 scheduler Pod에서 S3로부터 데이터를 다운로드 받을 수 있는 준비가 된 것입니다. 이제는 주기적으로 S3 버킷을 sync 하는 역할을 하는 Sidecar 컨테이너를 실행하도록 합니다.

(Option 1) Community의 s3-sync 컨테이너 사용

아래 명령을 순차적으로 실행하여 Github으로부터 받아온 Dockerfile을 빌드하고 사용중인 AWS Account의 ECR에 이미지를 push합니다.
# ECR repository 생성
aws ecr create-repository \
--repository-name s3-sync \
--image-scanning-configuration scanOnPush=true \
--region ${AWS_REGION}

# 컨테이너 이미지 push를 위해 ECR 로그인
aws ecr get-login-password --region ${AWS_REGION} | docker login --username AWS --password-stdin $ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com

# Git clone
git clone https://github.com/Swalloow/s3-sync.git
cd s3-sync

# 컨테이너 이미지 빌드 후 push
docker build -t $ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/s3-sync:latest .
docker push $ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/s3-sync:latest
echo $ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/s3-sync:latest
 
이제 Helm chart의 values.yaml 파일을 수정하여 위에서 만든 컨테이너 이미지로 Sidecar 컨테이너를 실행하도록 설정합니다. scheduler Pod와 Sidecar 컨테이너 간 데이터를 공유할 수 있도록 emptydir을 사용하여 Pod 내에서만 데이터 공유가 가능합니다.
... 생략 ...
scheduler:
  ... 생략 ...
  # Launch additional containers into scheduler.
  extraContainers:
    - image: <Account ID>.dkr.ecr.<Region>.amazonaws.com/s3-sync:latest
      name: s3-sync
      # Airflow helm chart로 실행되는 컨테이너는 기본적으로 적은 권한이 부여되어 aws cli를 실행할 수가 없습니다.
      # 이를 위해  securityContext 설정으로 권한을 상승시켰습니다. (보안상 권장하는 방식은 아님)
      securityContext:
        runAsUser: 0
      imagePullPolicy: Always
      volumeMounts:
        - name: dags
          mountPath: /opt/airflow/dags
      env:
        - name: AWS_BUCKET
          value: yonghoch-airflow-on-eks
        - name: KEY_PATH
          value: dags
        - name: DEST_PATH
          value: /opt/airflow/dags
        - name: INTERVAL
          value: "10"
  extraVolumeMounts:
  - name: dags
    mountPath: /opt/airflow/dags
  extraVolumes:
  - name: dags
    emptyDir: {}
 
이제 아래 명령을 실행하여 변경된 설정을 반영합니다.
helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml
 
 
Pod가 정상적으로 실행되면 다음과 같은 로그를 출력하며 주기적으로 S3의 파일을 Sync 합니다.
S3의 파일과 동일하게 /opt/airflow/dags 디렉토리 하위에 sync가 된 것을 확인할 수 있습니다.

(Option2) aws-cli 이미지를 사용하는 방법

대부분의 내용은 Option 1의 설명과 유사하기 때문에 여기서는 Helm chart values.yaml 내용 위주로 설명합니다.
... 생략 ...
scheduler:
  ... 생략 ...
  # Launch additional containers into scheduler.
  extraContainers:
    - args:
      - while true; aws s3 sync --exact-timestamps --delete 's3://yonghoch-airflow-on-eks/dags' '/opt/airflow/dags';
        do sleep 60; done;
      command:
      - /bin/bash
      - -c
      - --
      image: amazon/aws-cli:2.13.19
      name: sync-dags
      securityContext:
        runAsUser: 0
      volumeMounts:
        - name: dags
          mountPath: /opt/airflow/dags
  extraVolumeMounts:
  - name: dags
    mountPath: /opt/airflow/dags
  extraVolumes:
  - name: dags
    emptyDir: {}
 
이제 아래 명령을 실행하여 변경된 설정을 반영합니다.
helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml