๐จ ์ฒซ DAG ๋ง๋ค๊ธฐ
DAG๋ฅผ ์์ฑํ๊ธฐ ์ํด ๋จผ์ dags ํด๋๋ฅผ ํ๋ ๋ง๋ค๊ณ tutorials.py ๋ผ๋ DAG๋ฅผ ์์ฑํ python ํ์ผ์ ์์ฑํ๋ค.
์ฒ์์ผ๋ก ๋ง๋ค์ด๋ณผ DAG๋ 3๊ฐ์ Task๋ก ๊ตฌ์ฑ๋๋ค.
- Task1 : ํ์ฌ ๋ ์ง์ ์๊ฐ์ ๋ฌธ์์ด๋ก ์ถ๋ ฅ
- Task2 : 5์ด ๋์ ๋๊ธฐ ์ํ ์ ์ง
- Task3 : "Hello Airflow!" ๋ฌธ์์ด์ 5๋ฒ ์ถ๋ ฅ
- Task4 : Gmail๋ก ์์ ์๋ฃ ๋ฉ์ผ ์ ์ก
DAG๊ฐ ํธ๋ฆฌ๊ฑฐ๋๋ฉด Task1์ด ๊ฐ์ฅ ๋จผ์ ์ํ๋๊ณ Task2, Task3๊ฐ ๋ณ๋ ฌ๋ก ์คํ๋๊ณ ๊ทธ ์ดํ Task4๊ฐ ์ํ๋๋ค.
webserver์์ Graph ๊ธฐ๋ฅ์ ์ ๊ณตํ๊ธฐ ๋๋ฌธ์ ์๋์ ๊ฐ์ด ๋ด๊ฐ ๋ง๋ DAG๊ฐ ์ด๋ค ์์๋ก ๋์ํ๋์ง ์๊ฐ์ ์ผ๋ก ํ์ธํ ์ ์๋ค.
๐ช Importing Modules
DAG ๊ฐ์ฒด๋ฅผ ์ ์ํ๊ธฐ ์ํด Python ์คํฌ๋ฆฝํธ ์์ฑ์ ํ์ํ Module์ Importํ๋ค.
import textwrap
from datetime import datetime, timedelta
# DAG object; DAG ์ด๊ธฐํ๋ฅผ ์ํด ์ฌ์ฉ
from airflow.models.dag import DAG
# Operatorsl; DAG ๋์์ ์ํด ์ฌ์ฉ
from airflow.operators.bash import BashOperator
๐ Default Arguments
DAG ์์ฑ์ ํ์ํ Arguments์ ๋ํด์ ์์๋ณด์.
- depends_on_past
- ์ด์ ์คํ์ ์ฑ๊ณต ์ฌ๋ถ์ ๋ฐ๋ผ ํ์ฌ ํ์คํฌ ์คํ์ ๊ฒฐ์ - True: ์ด์ ์คํ์ด ์ฑ๊ณตํด์ผ๋ง ํ์ฌ ํ์คํฌ๊ฐ ์คํ
- False: ์ด์ ์คํ์ ์ฑ๊ณต ์ฌ๋ถ์ ์๊ด์์ด ํ์ฌ ํ์คํฌ๊ฐ ์คํ
- eamil
- ํ์คํฌ ์คํจ๋ ์ฌ์๋ ์ ์๋ฆผ์ ๋ฐ์ ์ด๋ฉ์ผ ์ฃผ์ ๋ชฉ๋ก์ ์ง์ - email_on_failure
- ํ์คํฌ๊ฐ ์คํจํ ๋ ์ด๋ฉ์ผ ์๋ฆผ์ ๋ณด๋ผ์ง ์ฌ๋ถ๋ฅผ ๊ฒฐ์ - True: ํ์คํฌ ์คํจ ์ ์ด๋ฉ์ผ ์๋ฆผ์ ๋ณด๋ธ๋ค.
- False: ํ์คํฌ ์คํจ ์ ์ด๋ฉ์ผ ์๋ฆผ์ ๋ณด๋ด์ง ์๋๋ค.
- email_on_retry
- ํ์คํฌ๊ฐ ์ฌ์๋๋ ๋ ์ด๋ฉ์ผ ์๋ฆผ์ ๋ณด๋ผ์ง ์ฌ๋ถ๋ฅผ ์ค์ - True: ํ์คํฌ ์ฌ์๋ ์ ์ด๋ฉ์ผ ์๋ฆผ์ ๋ณด๋
- False: ํ์คํฌ ์ฌ์๋ ์ ์ด๋ฉ์ผ ์๋ฆผ์ ๋ณด๋ด์ง ์๋๋ค.
- retires
- ํ์คํฌ ์คํจ ์ ์ฌ์๋ ํ์๋ฅผ ์ค์ - retry_delay
- ํ์คํฌ ์คํจ ํ ์ฌ์๋ ์ ๋๊ธฐ ์๊ฐ์ ์ค์ - queue
- ํ์คํฌ๋ฅผ ์คํํ ํ๋ฅผ ์ง์ . Airflow์์ ์ฌ๋ฌ ํ๋ฅผ ์ฌ์ฉํ ์ ์์. - pool
- ํ์คํฌ๊ฐ ์ํ ํ์ ์ง์ . ํ์ ์์ ํ ๋น์ ์ ํํ๋ ๋ฐ ์ฌ์ฉ - priority_weight
- ํ์คํฌ์ ์ฐ์ ์์๋ฅผ ์ง์ . ์ซ์๊ฐ ํด์๋ก ๋์ ์ฐ์ ์์๋ฅผ ๊ฐ์ง - end_date
- ํ์คํฌ๊ฐ ์คํ๋ ์ ์๋ ๋ง์ง๋ง ๋ ์ง๋ฅผ ์ง์ - wait_for_downstream
- ๋ชจ๋ ํ์ ํ์คํฌ๊ฐ ์๋ฃ๋ ๋๊น์ง ํํ ํ์คํฌ๊ฐ ์๋ฃ๋์ง ์๋๋ก ์ค์ - True: ํ์ ํ์คํฌ๊ฐ ๋ชจ๋ ์๋ฃ๋ ๋๊น์ง ๊ธฐ๋ค๋ฆผ
- False: ๊ธฐ๋ค๋ฆฌ์ง ์์
- sla
- ํน์ ์๊ฐ ๋ด์ ํ์คํฌ๊ฐ ์๋ฃ๋์ง ์์ผ๋ฉด SLA ๊ฒฝ๊ณ ๋ฅผ ๋ฐ์์ํด - execution_timeout
- ํ์คํฌ๊ฐ ์ง์ ๋ ์๊ฐ ๋ด์ ์๋ฃ๋์ง ์์ผ๋ฉด ์คํจ๋ก ๊ฐ์ฃผ - on_failure_callback
- ํ์คํฌ๊ฐ ์คํจํ์ ๋ ์ฝ๋ฐฑ ํจ์๋ฅผ ์ง์ - on_success_callback
- ํ์คํฌ๊ฐ ์ฑ๊ณตํ์ ๋ ์คํํ ์ฝ๋ฐฑ ํจ์๋ฅผ ์ง์ - `on_retry_callback
- ํ์คํฌ๊ฐ ์ฌ์๋๋ ๋ ์คํํ ์ฝ๋ฐฑ ํจ์๋ฅผ ์ง์ - sla_miss_callback
- ํ์คํฌ๊ฐ SLA๋ฅผ ์ถฉ์กฑํ์ง ๋ชปํ์ ๋ ์คํํ ์ฝ๋ฐฑ ํจ์๋ฅผ ์ง์ - on_skipped_callback
- ํ์คํฌ๊ฐ ๊ฑด๋๋ฐ๊ธฐ ๋์์ ๋ ์คํํ ์ฝ๋ฐฑ ํจ์๋ฅผ ์ง์ - trigger_rule
- ํ์คํฌ๊ฐ ์คํ๋ ํธ๋ฆฌ๊ฑฐ ๊ท์น์ ์ง์ . ๊ธฐ๋ณธ๊ฐ์ all_success๋ก, ๋ชจ๋ ์์ ํ์คํฌ๊ฐ ์ฑ๊ณตํด์ผ ์คํ- all_sucess: ๋ชจ๋ ์์ ํ์คํฌ๊ฐ ์ฑ๊ณตํด์ผ ์คํ
- all_failed: ๋ชจ๋ ์์ ํ์คํฌ๊ฐ ์คํจํด์ผ ์คํ
- all_done: ๋ชจ๋ ์์ ํ์คํฌ๊ฐ ์๋ฃ๋๋ฉด ์คํ
- one_success: ํ๋์ ์์ ํ์คํฌ๊ฐ ์ฑ๊ณตํ๋ฉด ์คํ
- one_failed: ํ๋์ ์์ ํ์คํฌ๊ฐ ์คํจํ๋ฉด ์คํ
- none_failed: ์คํจํ ์์ ํ์คํฌ๊ฐ ์์ผ๋ฉด ์คํ
- none_skipped: ๊ฑด๋๋ด ์์ ํ์คํฌ๊ฐ ์์ผ๋ฉด ์คํ
- dummy: ํธ๋ฆฌ๊ฑฐ ๊ท์น์ ๋ฌด์ํ๊ณ ํญ์ ์คํ
๐๏ธ Instantiate a DAG
"tutorial"์ด๋ผ๋ ์ด๋ฆ์ DAG๋ฅผ ์ ์ํ๊ณ , ๊ธฐ๋ณธ ์ธ์, ์ค์ผ์ค ๊ฐ๊ฒฉ, ์์ ๋ ์ง, ์ค๋ช , ํ๊ทธ ๋ฑ์ ์ค์ ํ๋ค. DAG ๋ด์ ์ ์๋ ํ์คํฌ๋ค์ ์ด ์ค์ ๋ค์ ๊ธฐ๋ณธ์ผ๋ก ์ฌ์ฉํ๊ฒ ๋๋ค.
์ด DAG๋ ํ๋ฃจ์ ํ ๋ฒ ์คํ๋๋ฉฐ, 2021๋ 1์ 1์ผ ์ดํ์ ์คํ๋ถํฐ ์ค์ผ์ค๋ง๋๋ฉฐ ๊ณผ๊ฑฐ์ ์ค์ผ์ค๋ ์คํ์ ๋ฌด์๋๋ค.
- dag_id: DAG์ ๊ณ ์ ์๋ณ์๋ฅผ ์ค์ ํ๋ค. DAG ์ด๋ฆ์ Webserver, ๋ก๊ทธ์์ ์ฌ์ฉ๋๋ค.
- default_args: DAG ๊ฐ์ฒด ์ ์๋ฅผ ์ํ arguments๋ฅผ ๋์ ๋๋ฆฌ ํํ๋ก ์ ๋ฌํ๋ค.
- description: DAG์ ์ค๋ช ์ ์ค์ ํ๋ค.
- schedule_interval: DAG๊ฐ ์คํ๋ ๊ฐ๊ฒฉ์ ์ค์ ํ๋ค.
- start_date: DAG์ ์์ ๋ ์ง๋ฅผ ์ค์ ํ๋ค. ์ด ๋ ์ง ์ดํ์ DAG๊ฐ ์ค์ผ์ค๋ง๋๋ค.
- catchup: DAG๊ฐ ์์ ๋ ์ง ์ดํ์ ์คํ๋์ง ์์ ๊ธฐ๊ฐ ๋์์ ๋ชจ๋ ์คํ์ ์๋ฃํ ์ง ์ฌ๋ถ๋ฅผ ์ค์ ํ๋ค. False์ ๊ฒฝ์ฐ ๊ณผ๊ฑฐ์ ์ค์ผ์ค๋ ์คํ์ ๋ฌด์ํ๋ค.
- tags: DAG์ ํ๊ทธ๋ฅผ ์ค์ ํ๋ค. Webserver์์ DAG๋ฅผ ๊ฒ์ํ๊ณ ํํฐ๋งํ๋ ๋ฐ ์ฌ์ฉ๋๋ค.
with DAG(
"my_first_dag",
# ์ด ์ธ์๋ค์ ๊ฐ operator์ ์ ๋ฌ๋จ
# operator ์ด๊ธฐํ ์ค์ ๊ฐ task๋ณ๋ก ์ค๋ฒ๋ผ์ด๋ฉ์ด ๊ฐ๋ฅ
default_args={
"depends_on_past": False,
"email": ["kimdong799@gmail.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
์ค๊ฐ์ DAG๋ฅผ ์์ ํ๋ค๋ฉด ๋ค์์ ๋ช ๋ น์ด๋ฅผ ์คํํ์
airflow db init
airflow scheduler -D
Operator
Operator๋ ์ํฌํ๋ก์ฐ ๋ด์์ ๋จ์ผ Task๋ฅผ ์ ์ํ๋ ๊ตฌ์ฑ ์์์ด๋ค.
๊ฐ Operator๋ ํน์ ์ ํ์ Task๋ฅผ ์ํํ๊ธฐ ์ํ ๋ก์ง์ ์บก์ํํ๋ค.
Operator๋ ๊ธฐ๋ณธ์ ์ผ๋ก BaseOperator๋ฅผ ์์๋ฐ์ผ๋ฉฐ ์์ ์ ์คํํ๊ธฐ ์ํ ๊ธฐ๋ณธ์ ์ธ ์ธ์๋ค์ ํฌํจํ๋ค.
Operator์ ์ข ๋ฅ
- BashOperator: Bash ๋ช ๋ น์ด ๋๋ ์คํฌ๋ฆฝํธ๋ฅผ ์คํ
- PythonOperator: Python ํจ์๋ฅผ ์คํ
- KubernetesPodOperator: Kubernetes Pod์์ ์์ ์ ์คํ
- SimpleHttpOperator: HTTP ์์ฒญ์ ๋ณด๋
- EmailOperator: ์ด๋ฉ์ผ์ ๋ณด๋
Tasks
์์ ์ค๋ช
ํ Operator๋ฅผ ์ด์ฉํ์ฌ ๊ฐ ๋
๋ฆฝ๋ Task๋ฅผ ์ ์ํ๋ค.
์ ์๋ ํ์คํฌ์ ์คํ ์์๋ >> ๊ธฐํธ๋ฅผ ์ด์ฉํ์ฌ ์์ฑํ ์ ์๋ค.
# t1, t2 ๋ฐ t3, t4๋ operator๋ฅผ ์ธ์คํด์คํํ์ฌ ์์ฑ๋ task์ ์์
task1 = BashOperator(
task_id="print_date",
bash_command="date", # ํ์ฌ ๋ ์ง์ ์๊ฐ์ ๋ฌธ์์ด๋ก ์ถ๋ ฅ
)
task2 = BashOperator(
task_id="sleep",
# depends_on_past: False,
bash_command="sleep 5", # 5์ด ๋์ ๋๊ธฐ ์ํ ์ ์ง
retries=3,
)
# Task์ ๋ํ ๋ฌธ์๋ฅผ ์์ฑ
task1.doc_md = textwrap.dedent(
"""\
### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # DAG์์ ๋ถ๋ถ์ docstrin์ด ์๋ ๊ฒฝ์ฐ
dag.doc_md = """
This is a documentation placed anywhere
""" # ๊ทธ๋ ์ง ์์ ๊ฒฝ์ฐ ์๋์ ๊ฐ์ด ์์ฑ
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "Hello Airflow!"
{% endfor %}
"""
)
task3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
task4 = EmailOperator(
task_id="send_email",
to="kimdong799@gmail.com", # ์์ ์ ์ด๋ฉ์ผ ์ฃผ์
subject="Airflow Alert",
html_content="""<h3>DAG Completed</h3><p>This is a test email sent from an Airflow DAG.</p>""",
)
task1 >> [task2, task3] >> task4
์ง๊ธ๊น์ง ์ ์ํ DAG๋ฅผ Webserver์์ ์ค์๊ฐ์ผ๋ก ๋ชจ๋ํฐ๋ง ํ ์ ์์ด ๊ต์ฅํ ๊ฐํธํ๋ค.
๋ชจ๋ Task๊ฐ ์ ์์ ์ผ๋ก ์ํ๋์ด ๋ฉ์ผ๊น์ง ๋์ฐฉํ ๊ฒ์ ํ์ธํ๋ค.
ref : https://airflow.apache.org/docs/apache-airflow/stable/tutorial/fundamentals.html
ref : https://stackoverflow.com/questions/51829200/how-to-set-up-airflow-send-email
'Airflow' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Airflow] Airflow ์ธํ ํ๊ธฐ (0) | 2024.08.29 |
---|---|
[Airflow] Airflow๊ฐ ๋ฌด์์ธ๊ฐ? (0) | 2024.08.29 |