Section | Section title | Concepts |
---|---|---|
Section3 | Coding your first data pipeline with Airflow | DAG Operator Sheduler Web server catchup |
Section4 | Databases Executors | [Executors] SeqentialExecutor LocalExecutor CeleryExecutor [Databases] SQLite Postgres |
Section5 | Implementing advanced concepts in Airflow | SubDAG TaskGroups XComs Trigger rules |
Virtual box allows you have virtual machine on your computer
Everything in airflow is coded in Python
!
At the end of this section,
Typical use case
What happen if you have problems part of pipeline?
What happen if you have hundreds of different data pipelines to manage at the same time?
With Airflow,
from airflow.models import DAG
from datetime import datetime
default_args = {
'start_date' : datetime(2020,1,1)
}
with DAG('user_processing', schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
# Define tasks/operators
Type | Role |
---|---|
Action operators | Execute an action |
Transfer operators | Transfer data |
Sensors | Wait for a condition to be met |
from airflow.models import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from datetime import datetime
default_args = {
'start_date' : datetime(2020,1,1)
}
with DAG('user_processing', schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
creating_table = SqliteOperator(
task_id='creating_table',
sqlite_conn_id='db_sqlite',
sql='''
CREATE TABLE users (
user_id INTEGER PRIMARY KEY AUTOINCREMENT,
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL
);
'''
)
or,
from airflow.models import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from datetime import datetime
default_args = {
'start_date' : datetime(2020,1,1)
}
with DAG('user_processing', schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
creating_table = SqliteOperator(
task_id='creating_table',
sqlite_conn_id='db_sqlite',
sql='''
CREATE TABLE users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
'''
)
install packages including providers!
$ pip install 'apache-airflow-providers-sqlite'
In airflow webserver,
$ airflow tasks test {dag_id} {task_id} {start_date in YYYY-MM-DD}
from airflow.models import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from datetime import datetime
default_args = {
'start_date' : datetime(2020,1,1)
}
with DAG('user_processing', schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
creating_table = SqliteOperator(
task_id='creating_table',
sqlite_conn_id='db_sqlite',
sql='''
CREATE TABLE users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
'''
)
is_api_available = HttpSensor(
task_id='is_api_available',
http_conn_id='user_api',
endpoint='api/'
)
In airflow webserver,
In terminal,
$ pip install 'apache-airflow-providers-http'
$ airflow tasks test {dag_id} {task_id} {execution date in YYYY-MM-DD}
e.g.
$ airflow tasks test user_processing is_api_available 2020-01-01
from airflow.models import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from datetime import datetime
import json
default_args = {
'start_date' : datetime(2020,1,1)
}
with DAG('user_processing', schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
creating_table = SqliteOperator(
task_id='creating_table',
sqlite_conn_id='db_sqlite',
sql='''
CREATE TABLE users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
'''
)
is_api_available = HttpSensor(
task_id='is_api_available',
http_conn_id='user_api',
endpoint='api/'
)
extracting_user = SimpleHttpOperator(
task_id='extracting_user',
http_conn_id='user_api',
endpoint='api/',
method='GET',
response_filter=lambda response: json.loads(response.text),
log_response=True
)
$ airflow tasks test user_processing extracting_user 2020-01-01
from airflow.models import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
from pandas import json_normalize
import json
default_args = {
'start_date' : datetime(2020,1,1)
}
def _processing_user():
with DAG('user_processing', schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
creating_table = SqliteOperator(
task_id='creating_table',
sqlite_conn_id='db_sqlite',
sql='''
CREATE TABLE users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
'''
)
is_api_available = HttpSensor(
task_id='is_api_available',
http_conn_id='user_api',
endpoint='api/'
)
extracting_user = SimpleHttpOperator(
task_id='extracting_user',
http_conn_id='user_api',
endpoint='api/',
method='GET',
response_filter=lambda response: json.loads(response.text),
log_response=True
)
processing_user = PythonOperator(
task_id='processing_user',
python_callable=_processing_user
)
$ airflow tasks test user_processing extracting_user 2020-01-01
from airflow.models import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
from pandas import json_normalize
import json
default_args = {
'start_date' : datetime(2020,1,1)
}
def _processing_user(ti):
users=ti.xcom_pull(task_ids=['extracting_user'])
if not len(users) or 'results' not in users[0]:
raise ValueError('User is empty')
user = users[0]['results'][0]
processed_user=json_normalize({
'firstname': user['name']['first'],
'lastname': user['name']['last'],
'country': user['location']['country'],
'username': user['login']['username'],
'password': user['login']['password'],
'email': user['email']
})
processed_user.to_csv('/tmp/processed_user.csv', index=None, header=False)
with DAG('user_processing', schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
creating_table = SqliteOperator(
task_id='creating_table',
sqlite_conn_id='db_sqlite',
sql='''
CREATE TABLE users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
'''
)
is_api_available = HttpSensor(
task_id='is_api_available',
http_conn_id='user_api',
endpoint='api/'
)
extracting_user = SimpleHttpOperator(
task_id='extracting_user',
http_conn_id='user_api',
endpoint='api/',
method='GET',
response_filter=lambda response: json.loads(response.text),
log_response=True
)
processing_user = PythonOperator(
task_id='processing_user',
python_callable=_processing_user
)
$ airflow tasks test user_processing processing_user 2020-01-01
from airflow.models import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
from pandas import json_normalize
import json
default_args = {
'start_date' : datetime(2020,1,1)
}
def _processing_user(ti):
users=ti.xcom_pull(task_ids=['extracting_user'])
if not len(users) or 'results' not in users[0]:
raise ValueError('User is empty')
user = users[0]['results'][0]
processed_user=json_normalize({
'firstname': user['name']['first'],
'lastname': user['name']['last'],
'country': user['location']['country'],
'username': user['login']['username'],
'password': user['login']['password'],
'email': user['email']
})
processed_user.to_csv('/tmp/processed_user.csv', index=None, header=False)
with DAG('user_processing', schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
creating_table = SqliteOperator(
task_id='creating_table',
sqlite_conn_id='db_sqlite',
sql='''
CREATE TABLE users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
'''
)
is_api_available = HttpSensor(
task_id='is_api_available',
http_conn_id='user_api',
endpoint='api/'
)
extracting_user = SimpleHttpOperator(
task_id='extracting_user',
http_conn_id='user_api',
endpoint='api/',
method='GET',
response_filter=lambda response: json.loads(response.text),
log_response=True
)
processing_user = PythonOperator(
task_id='processing_user',
python_callable=_processing_user
)
storing_user = BashOperator(
task_id='storing_user',
bash_command='echo -e ".separator ","\n.import /tmp/processed_user.csv users" | sqlite3 /Users/LeeHyangki/airflow/airflow.db'
)
$ cd /airflow
$ sqlite3 airflow.db
SELECT * FROM users; # gives nothing
$ airflow tasks test user_processing storing_user 2020-01-01
$ sqlite3 airflow.db
SELECT * FROM users; # Now, there are results!
creating_table >> is_api_available >> extracting_user >> processing_user >> storing_user
How can we fix this issue?
Fix the DAG code [creating_table] fucntion as below:
creating_table = SqliteOperator(
task_id='creating_table',
sqlite_conn_id='db_sqlite',
sql='''
CREATE TABLE IF NOT EXISTS users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
'''
)
Item | Description | Example |
---|---|---|
start_date | When your dag would start being scheduled | 2020-01-01 |
schedule_interval | Defining the frequency at which your data pipeline would be triggered | 2020-01-01 10:10AM |
execution_date | 2020-01-01 10AM |
catchup
![Option catchup
]
catchup = True
in function DAG
catchup = True
, all the non-triggered dag runs starting from the latest execution date will be automatically triggered, NOT from the start date[Time zone in airflow]
[Questions]
[Answers]
$ airflow config get-value core sql_alchemy_conn
$ airflow config get-value core executor
task_2
and task_3
at the same time?[Step1. database]
$ brew install postgresql
$ /usr/local/opt/postgres/bin/createuser -s postgres
$ psql -U postgres
ALTER USER postgres PASSWORD 'postgres';
$ pip install 'apache-airflow[postgres]'
sql_alchemy_conn
part as below :postgresql+psycopg2://postgres:postgres@localhost/postgres
$ airflow db check
[Step2. executor]
SequentialExecutor
to LocalExecutor
[Next stpe]
$ airflow db init
$ airflow users create -u admin -p admin -r Admin -f admin -l admin -e admin@airflow.com
$ airflow webserver
$ airflow scheduler
[What is the difference?]
worker_concurrency
pip install 'apache-airflow[celery]'
brew install redis
sudo nano /usr/local/etc/redis.conf
supervised no
to supervised systemd
brew services start redis
redis-cli ping
executor = CeleryExecutor
broker_url = redis://localhost:6379/0
result_backend = db+postgresql://postgres:postgres@localhost/postgres
$ airflow celery flower
$ airflow celery worker
$ airflow webserver
$ airflow scheduler
[Types of executor]
[Parameters regarding concurrency]
parallelism=32
by default$concurrency
task instances for your DAG at any given time (for each worker)dag_concurrency
concurrency
dag_concurrency
but applied only to this specific dagmax_active_runs
DagRuns of your DAG at a given timemax_active_runs_per_dag
max_active_runs
(그림 출처)
subdag
dags/{dag_file}.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.subdag import SubDagOperator
from subdags.subdag_parallel_dag import subdag_parallel_dag
from datetime import datetime
default_args = {
'start_date': datetime(2020, 1, 1)
}
with DAG('parallel_dag', schedule_interval='@daily',
default_args=default_args,
catchup=True) as dag:
task_1 = BashOperator(
task_id='task_1',
bash_command='sleep 3'
)
processing = SubDagOperator(
task_id='processing_tasks',
subdag=subdag_parallel_dag('parallel_dag', 'processing_tasks', default_args)
)
task_4 = BashOperator(
task_id='task_4',
bash_command='sleep 3'
)
task_1 >> processing >> task_4
from airflow import DAG
from airflow.operators.bash import BashOperator
def subdag_parallel_dag(parent_dag_id, child_dag_id, default_args):
with DAG(dag_id=f'{parent_dag_id}.{child_dag_id}', default_args=default_args) as dag:
task_2 = BashOperator(
task_id='task_2',
bash_command='sleep 3'
)
task_3 = BashOperator(
task_id='task_3',
bash_command='sleep 3'
)
return dag
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
from subdags.subdag_parallel_dag import subdag_parallel_dag
from datetime import datetime
default_args = {
'start_date': datetime(2020, 1, 1)
}
with DAG('parallel_dag', schedule_interval='@daily',
default_args=default_args,
catchup=True) as dag:
task_1 = BashOperator(
task_id='task_1',
bash_command='sleep 3'
)
with TaskGroup('processing_tasks') as processing_tasks:
task_2 = BashOperator(
task_id='task_2',
bash_command='sleep 3'
)
task_3 = BashOperator(
task_id='task_3',
bash_command='sleep 3'
)
task_4 = BashOperator(
task_id='task_4',
bash_command='sleep 3'
)
task_1 >> processing_tasks >> task_4
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.subdag import SubDagOperator
from airflow.utils.task_group import TaskGroup
from random import uniform
from datetime import datetime
default_args = {
'start_date': datetime(2020, 1, 1)
}
def _training_model(ti):
accuracy = uniform(0.1, 10.0)
print(f'model\'s accuracy: {accuracy}')
ti.xcom_push(key='model_accuracy', value=accuracy)
def _choose_best_model(ti):
print('choose best model')
accuracies = ti.xcom_pull(key='model_accuracy', task_ids=[
'processing_tasks.training_model_a',
'processing_tasks.training_model_b',
'processing_tasks.training_model_c'
])
print(accuracies)
with DAG('xcom_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
downloading_data = BashOperator(
task_id='downloading_data',
bash_command='sleep 3',
do_xcom_push=False
)
with TaskGroup('processing_tasks') as processing_tasks:
training_model_a = PythonOperator(
task_id='training_model_a',
python_callable=_training_model
)
training_model_b = PythonOperator(
task_id='training_model_b',
python_callable=_training_model
)
training_model_c = PythonOperator(
task_id='training_model_c',
python_callable=_training_model
)
choose_model = PythonOperator(
task_id='choose_model',
python_callable=_choose_best_model
)
downloading_data >> processing_tasks >> choose_model
do_xcom_push=False
!BranchPythonOperator
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.subdag import SubDagOperator
from airflow.utils.task_group import TaskGroup
from airflow.operators.dummy import DummyOperator
from random import uniform
from datetime import datetime
default_args = {
'start_date': datetime(2020, 1, 1)
}
def _training_model(ti):
accuracy = uniform(0.1, 10.0)
print(f'model\'s accuracy: {accuracy}')
ti.xcom_push(key='model_accuracy', value=accuracy)
def _choose_best_model(ti):
print('choose best model')
accuracies = ti.xcom_pull(key='model_accuracy', task_ids=[
'processing_tasks.training_model_a',
'processing_tasks.training_model_b',
'processing_tasks.training_model_c'
])
print(accuracies)
def _is_accurate(ti):
ti.xcom_pull(key='model_accura')
return('accurate')
with DAG('xcom_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
downloading_data = BashOperator(
task_id='downloading_data',
bash_command='sleep 3',
do_xcom_push=False
)
with TaskGroup('processing_tasks') as processing_tasks:
training_model_a = PythonOperator(
task_id='training_model_a',
python_callable=_training_model
)
training_model_b = PythonOperator(
task_id='training_model_b',
python_callable=_training_model
)
training_model_c = PythonOperator(
task_id='training_model_c',
python_callable=_training_model
)
choose_model = PythonOperator(
task_id='choose_model',
python_callable=_choose_best_model
)
accurate = DummyOperator(
task_id='accurate'
)
inaccurate = DummyOperator(
task_id='inaccurate'
)
downloading_data >> processing_tasks >> choose_model
choose_model >> [accurate, inaccurate]
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.subdag import SubDagOperator
from airflow.utils.task_group import TaskGroup
from airflow.operators.dummy import DummyOperator
from random import uniform
from datetime import datetime
default_args = {
'start_date': datetime(2020, 1, 1)
}
def _training_model(ti):
accuracy = uniform(0.1, 10.0)
print(f'model\'s accuracy: {accuracy}')
ti.xcom_push(key='model_accuracy', value=accuracy)
def _choose_best_model(ti):
print('choose best model')
accuracies = ti.xcom_pull(key='model_accuracy', task_ids=[
'processing_tasks.training_model_a',
'processing_tasks.training_model_b',
'processing_tasks.training_model_c'
])
for accuracy in accuracies:
if accuracy > 2:
return 'accurate'
return('inaccurate')
with DAG('xcom_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
downloading_data = BashOperator(
task_id='downloading_data',
bash_command='sleep 3',
do_xcom_push=False
)
with TaskGroup('processing_tasks') as processing_tasks:
training_model_a = PythonOperator(
task_id='training_model_a',
python_callable=_training_model
)
training_model_b = PythonOperator(
task_id='training_model_b',
python_callable=_training_model
)
training_model_c = PythonOperator(
task_id='training_model_c',
python_callable=_training_model
)
choose_model = PythonOperator(
task_id='choose_model',
python_callable=_choose_best_model
)
accurate = DummyOperator(
task_id='accurate'
)
inaccurate = DummyOperator(
task_id='inaccurate'
)
storing = DummyOperator(
task_id='storing'
)
downloading_data >> processing_tasks >> choose_model
choose_model >> [accurate, inaccurate] >> storing
[Trigger rules]
Allow you to change the default behavior of your tasks
You can choose how a task is getting triggered by changing its trigger rules
There are 9 different trigger rules in order to change the way your tasks are getting triggered in your data pipelines
Airflow Documentation : Trigger rules
all_success
all_failed
all_done
one_success
one_failed
none_failed
none_failed_or_skipped
all_succeed
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
default_args = {
'start_date': datetime(2020, 1, 1)
}
with DAG('trigger_rule', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
taks_1 = BashOperator(
task_id='task_1',
bash_command='exit 0',
do_xcom_push=False
)
taks_2 = BashOperator(
task_id='taks_2',
bash_command='exit 0',
do_xcom_push=False
)
taks_3 = BashOperator(
task_id='taks_3',
bash_command='exit 0',
do_xcom_push=False
)
[taks_1, taks_2] >> taks_3
all_failed
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
default_args = {
'start_date': datetime(2020, 1, 1)
}
with DAG('trigger_rule', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
taks_1 = BashOperator(
task_id='task_1',
bash_command='exit 1',
do_xcom_push=False
)
taks_2 = BashOperator(
task_id='taks_2',
bash_command='exit 1',
do_xcom_push=False
)
taks_3 = BashOperator(
task_id='taks_3',
bash_command='exit 0',
do_xcom_push=False,
trigger_rule='all_failed'
)
[taks_1, taks_2] >> taks_3
all_done
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
default_args = {
'start_date': datetime(2020, 1, 1)
}
with DAG('trigger_rule', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
taks_1 = BashOperator(
task_id='task_1',
bash_command='exit 1',
do_xcom_push=False
)
taks_2 = BashOperator(
task_id='taks_2',
bash_command='exit 0',
do_xcom_push=False
)
taks_3 = BashOperator(
task_id='taks_3',
bash_command='exit 0',
do_xcom_push=False,
trigger_rule='all_done'
)
[taks_1, taks_2] >> taks_3
one_failed
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
default_args = {
'start_date': datetime(2020, 1, 1)
}
with DAG('trigger_rule', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
taks_1 = BashOperator(
task_id='task_1',
bash_command='exit 1',
do_xcom_push=False
)
taks_2 = BashOperator(
task_id='taks_2',
bash_command='sleep 30',
do_xcom_push=False
)
taks_3 = BashOperator(
task_id='taks_3',
bash_command='exit 0',
do_xcom_push=False,
trigger_rule='one_failed'
)
[taks_1, taks_2] >> taks_3
from airflow.hooks.base import BaseHook
from elasticsearch import Elasticsearch
es = Elasticsearch()
class ElasticHook(BaseHook):
def __init__(self, conn_id='elasticsearch_default', *args, **kwargs):
super().__init__(*args, **kwargs)
conn = self.get_connection(conn_id)
conn_config = {}
hosts = []
if conn.host:
hosts = conn.host.split(',')
if conn.port:
conn_config['port'] = int(conn.port)
if conn.login:
conn_config['http_auth'] = (conn.login, conn.password)
self.es = Elasticsearch(hosts, **conn_config)
self.index = conn.schema
def info(self):
return self.es.info()
def set_index(self, index):
self.index = index
def add_doc(self, index, doc_type, doc):
self.set_index(index)
res = self.es.index(index=index, doc_type=doc_type, doc=doc)
return res
from airflow import DAG
from elasticsearch_plugin.hooks.elastic_hook import ElasticHook
from airflow.operators.python import PythonOperator
from datetime import datetime
default_args = {
'start_date': datetime(2022, 1, 1)
}
def _print_es_info():
hook = ElasticHook()
print(hook.info())
with DAG('elasticsearch_dag', schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
print_es_info = PythonOperator(
task_id='print_es_info',
python_callable=_print_es_info
)
from airflow.hooks.base import BaseHook
from elasticsearch import Elasticsearch
es = Elasticsearch()
class ElasticHook(BaseHook):
def __init__(self, conn_id='elasticsearch_default', *args, **kwargs):
super().__init__(*args, **kwargs)
conn = self.get_connection(conn_id)
conn_config = {}
hosts = []
if conn.host:
hosts = conn.host.split(',')
if conn.port:
conn_config['port'] = int(conn.port)
if conn.login:
conn_config['http_auth'] = (conn.login, conn.password)
self.es = Elasticsearch(hosts, **conn_config)
self.index = conn.schema
def info(self):
return self.es.info()
def set_index(self, index):
self.index = index
def add_doc(self, index, doc_type, doc):
self.set_index(index)
res = self.es.index(index=index, doc_type=doc_type, body=doc)
return res
from airflow.models import BaseOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from elasticsearch_plugin.hooks.elastic_hook import ElasticHook
from contextlib import closing
import json
class PostgresToElasticOperator(BaseOperator):
def __init__(self, sql, index,
postgres_conn_id='postgres_default',
elastic_conn_id='elasticsearch_default', *args, **kwargs):
super(PostgresToElasticOperator, self).__init__(*args, **kwargs)
self.sql = sql
self.index = index
self.postgres_conn_id = postgres_conn_id
self.elastic_conn_id = elastic_conn_id
def execute(self, context):
es = ElasticHook(conn_id=self.elastic_conn_id)
pg = PostgresHook(postgress_conn_id=self.postgres_conn_id)
with closing(pg.get_conn()) as conn:
with closing(conn.cursor()) as cur:
cur.itersize = 1000
cur.execute(self.sql)
for row in cur:
doc = json.dumps(row, indent=2)
es.add_doc(index=self.index, doc_type='external', doc=doc)
from airflow import DAG
from elasticsearch_plugin.hooks.elastic_hook import ElasticHook
from elasticsearch_plugin.operators.postgres_to_elastic import PostgresToElasticOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
default_args = {
'start_date': datetime(2022, 1, 1)
}
def _print_es_info():
hook = ElasticHook()
print(hook.info())
with DAG('elasticsearch_dag', schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
print_es_info = PythonOperator(
task_id='print_es_info',
python_callable=_print_es_info
)
connections_to_es = PostgresToElasticOperator(
task_id='connections_to_es',
sql='SELECT * FROM connection',
index='connections'
)
print_es_info >> connections_to_es
$ psql postgres
$ curl -X GET "http://localhost:9200/connections/_search" -H "Content-type: application/json" -d '{"query":{"match_all":{}}}'
Docker compose
It is better to seperate your containers according to the components you want to run : webserver, scheduler, DB