Skip to main content

การใช้งาน Apache Airflow

Apache Airflow คือ ซอฟต์แวร์ที่ช่วยบริหารจัดการ Data Pipeline คุณจะได้เรียนรู้เทคนิคการเขียนโปรแกรมเพื่อสร้าง Data Pipeline ในหลักสูตร Data Engineering Bootcamp คู่มือนี้จะอธิบายวิธีการเข้าใช้งาน Apache Airflow ภายใน กฟน. และ วิธีการ Interface กับ Data Sources ต่างๆภายในองค์กร

Quick Links

การเข้าใช้งาน Apache Airflow

คุณสามารถใช้งาน Apache Airflow ได้ที่ URL: http://airflow.mea.or.th:8080 โดยจะต้องเข้าใช้งานผ่านเน็ตเวิร์กภายในของ กฟน. หรือ ผ่าน VPN เท่านั้น หลังจากเข้าเว็บไซต์แล้วจะพบหน้า Login ให้ใช้ Username และ Password จากระบบ AD ของ กฟน. (ที่ใช้ Login Computer/WiFi) เพื่อเข้าใช้งาน

Airflow

การนำเข้า DAGs

คุณสามารถนำไฟล์ DAG ที่พัฒนาแล้วมาใช้ใน Airflow ของ กฟน. ได้โดยนำไฟล์ .py ไปใส่ไว้ใน Bucket ชื่อ dags ของ MinIO จากนั้นระบบจะทำการ Sync ไฟล์เข้า Airflow ภายใน 2 - 3 นาทีหลังจากวางไฟล์

Airflow

Delete DAGs

หากคุณต้องการลบ DAGs คุณสามารถลบผ่าน MinIO ได้เลย Airflow จะทำ Clean up DAGs ทุกวันเวลาเที่ยงคืน

การนำไฟล์เข้า Airflow

คุณสามารถนำไฟล์เข้า Airflow พื่อใช้ประมวลผลได้สองวิธี

  • นำไฟล์ไปวางที่ Bucket airflow ใน MinIO จากนั้นคุณสามารถใช้ Python package minio (ใน Airflow ทำการติดตั้งให้เรียบร้อยแล้ว) ในการดาวโหลด หรือ อ่านไฟล์จาก MinIO ได้โดยตรง
  • นำไฟล์ไปวางผ่าน SFTP ที่ Host: airflow.mea.or.th Port: 5436 Username: airflow Password: airflow โดยไฟล์ที่อัพโหลดผ่าน SFTP สามารถเรียกใช้ใน Airflow ได้ที่ Path: /shared ตัวอย่างเช่น คุณอัพโหลดไฟล์ชื่อ mydata.csv ในโฟลเดอร์ Shared ใน SFTP เวลาอ่านไฟล์ใน DAG คุณสามารถใช้ Path: /shared/mydata.csv
Shared Directory

ไดเรกทอรี่ shared สามารถเข้าถึงได้ทุกคนภายในหลักสูตร คุณไม่ควรเก็บข้อมูลส่วนบุคคล หรือ ข้อมูลที่มีความอ่อนไหว

สถาปัตยกรรม Apache Airflow

กฟน. Deploy Apache Airflow ใช้ Celery Executor มี 15 Workers สำหรับการประมวลผล แบบขนานและมีระบบคิวในการรัน Task โดยมี vCPU 92 Cores และ RAM 312GB ซึ่ง

  • เหมาะกับการประมวลผล Python Code ทั่วไป ขนาดข้อมูลไม่ใหญ่มาก การประมวลผลไม่ซับซ้อนมาก
  • หากต้องการประมวลผลข้อมูลขนาดใหญ่ ขอให้ใช้ Apache Spark ใน Data Platform แทน
  • โดย Default กฟน. ลงทุก Connectors ลง Python packages ที่นิยมและจำเป็นต้องใช้ หากพบ module not found คุณสามารถแจ้งปัญหามาที่ ฝวท. เพื่อลง Package เพิ่มได้ คุณไม่ควรจะ pip install เอง เนื่องจาก Airflow มีทั้งหมด 15 Workers และ ควรจะลงให้ครบทุก Worker เพื่อป้องกันปัญหาการรัน Task บน Worker ที่ไม่มี Package

สิทธิการใช้งาน

สิทธิการใช้งานของนักเรียน Data Bootcamp เป็นดังนี้

สิทธิDAGsConnections & Variables
Typical UserView only-
Data AnalystView only-
Data ScientistView only-
Data EngineerCreate, Read, Update, DeleteCreate, Read, Update, Delete
Data Engineer

สิทธิในการใช้งานของ Data Engineer เยอะกว่านักเรียน Data Scientist และ Data Analyst เพื่อให้สะดวกในการทำงานและเรียนรู้ ขอให้ระมัดระวังในการใช้อำนาจที่คุณมี

การเชื่อมต่อกับ Data Sources

โดยทั่วไป Apache Airflow จะไม่ได้เชื่อมต่อ (ไม่มี Route และ ไม่ได้ Allow Firewall) กับ Data Source ในหน่วยงานของคุณ หากคุณต้องการที่จะเชื่อมต่อกับระบบของคุณสามารถทำได้โดยแจ้ง ฝวท. ซึ่งทางทีมงานจะประสานงานกับทีมงานเน็ตเวิร์กใช้เวลาประมาณ 2 - 3 วัน อย่างไรก็ตามคุณสามารถศึกษาวิธีการเขียนโปรแกรม Python เพื่อ Interface กับ Databases และ Data Platform ได้ตามรายละเอียดด้านล่าง

Development Databases

ใน Airflow มี Development Databases ให้คุณใช้เพื่อทำการทดสอบ Operator หรือ ฟังก์ชั่นต่างๆก่อนที่จะใช้งานกับระบบจริงของคุณ โดยคุณมีสิทธิ CRUD เต็มรูปแบบ ปัจจุบันมี Databases 3 ประเภทให้ใช้งาน

  • PostgreSQL ใช้งานได้โดยกำหนด conn_id='dev-postgres'
  • MariaDB ใช้งานได้โดยกำหนด conn_id='dev-mysql'
  • MongoDB ใช้งานได้โดยกำหนด conn_id='dev-mongodb'

Oracle Databases

การเชื่อมต่อ Oracle สามารถทำได้โดยการใช้ Module cxOracle ร่วมกับ sqlalchemy ซึ่งได้ติดตั้งไว้เรียบร้อยแล้ว ใน DAG ของคุณสามารถกำหนดวิธีการเชื่อมต่อ และ ทำการ Query ด้วย pandas ได้ทันที เช่น:

from sqlalchemy import create_engine
import pandas as pd

USERNAME=<DATABASE_USERNAME>
PASSWORD=<DATABASE_PASSWORD>
HOST=<DATABASE_HOST>
PORT='1521'
SERVICE=<SERVICE_NAME>

# Create a connection string
conn_str = f'oracle+cx_oracle://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/?service_name={SERVICE}'
conn = create_engine(conn_str)

pd.read_sql('SELECT * FROM <DATABASE_TABLE>', conn)

Development SFTP

ใน Airflow มี Development SFTP สำหรับให้ทดสอบเชื่อมต่อ อัพโหลด ดาวโหลด เข้าออกระบบ Airflow โดยสามารถใช้งานได้สองวิธี

  • ใช้งานภายใน DAG ได้โดยกำหนด conn_id='dev-sftp' และไฟล์จะไปอยู่ในโฟลเดอร์ /shared ของ Airflow

  • ต่อจาก SFTP Client ภายนอก (Network ภายใน กฟน. หรือ VPN เท่านั้น) เพื่อนำไฟล์เข้า Airflow โดยกำหนด

    ComponentValue
    Hostairflow.mea.or.th
    Port5436
    Usernameairflow
    Passwordairflow

โดยไฟล์ที่ถูกอัพโหลดผ่าน SFTP Client จะเข้าไปอยู่ในระบบ Airflow อัตโนมัติ และ สามารถเรียกใช้งานผ่าน Path /shared

คุณสามารถดูตัวอย่างการใช้งาน SFTP ใน Airflow ได้ใน DAG SFTP

WebHDFS

คุณสามารถเชื่อมต่อกับ HDFS บน Data Platform ได้โดยใช้ conn_id='sys-hdfs' ร่วมกับ WebHDFS API แต่ในสภาพแวดล้อมของ กฟน. นั้น Official Source Code ของ WebHDFS ยังไม่ซัพพอร์ตการเชื่อมแบบ Kerberized HTTPS ซึ่งเป็นระบบที่ กฟน. ใช้งาน คุณมีวิธีการเชื่อมต่อ 2 วิธี

  • ใช้งาน Official WebHDFS และเขียนโปรแกรมเพื่อจัดการ Kerberized Connection เอง
  • ใช้งาน Custom WebHDFS Hook/Sensor ที่ ฝวท. พัฒนาขึ้นเพื่อเชื่อมต่อ WebHDFS ได้ทันทีโดยให้ Import from mea.hooks.webhdfs import WebHDFSHook หรือ from mea.sensors.webhdf import WebHdfsSensor

Source Code ที่ ฝวท. แก้ไขเพื่อให้รองรับกับระบบของ กฟน. อยู่ที่ฟังก์ชั่น get_client โดยเพิ่มให้ KerberosClient สร้าง Connection จาก connection.schema แทนที่จะกำหนดตายตัวเป็น http:

def _get_client(self, connection: Connection) -> Any:
# We modified this line to allow KerberosClient
connection_str = f'{connection.schema}://{connection.host}:{connection.port}'

if _kerberos_security_mode:
client = KerberosClient(connection_str)
else:
proxy_user = self.proxy_user or connection.login
client = InsecureClient(connection_str, user=proxy_user)

return client

คุณสามารถดูตัวอย่างการใช้งานได้ที่ DAG HDFS

Apache Hive

คุณสามารถใช้ Apache Hive บน Data Platform ด้วยการกำหนด conn_id='sys-jdbc-hive' ร่วมกับ JDBCOperator คุณสามารถดูตัวอย่างการใช้งานได้ที่ DAG HDFS & Hive

Cloudera Impala

คุณสามารถใช้ Apache Impala บน Data Platform ด้วยการกำหนด conn_id='sys-jdbc-impala' ร่วมกับ JDBCOperator สามารถดูตัวอย่างได้ที่ DAG Impala

Apache Sqoop

Cloudera ของ กฟน. ไม่สนับสนุน Sqoop2 ที่มีโครงสร้าง Server - Client ทำให้ต้องใช้วิธีอื่นในการใช้งาน Sqoop ซอฟท์แวร์ของ Cloudera ปัจจุบันสามารถใช้งาน Sqoop1 ได้เพียงเวอร์ชั่นเดียว วิธีการใช้ร่วมกับ Apache Airflow ให้ใช้ SSHOperator โดยให้ระบุ ssh_conn_id='sys-ssh-sqoop' โดยมีขั้นตอนการใส่ Command ดังนี้

  • ทำการ Initialize Kerberos principal ด้วยคำสั่ง kinit -kt keytab/airflow.keytab airflow/airflow.mea.or.th
  • ทำการรันคำสั่ง sqoop

ในกรณีที่ต้องกรอก Password ของการต่อ Database ขอให้ระมัดระวัง โดยใช้ Airflow Variable เข้ามาช่วย

from airflow.providers.ssh.operators.ssh import SSHOperator

with DAG(
dag_id="sys__sqoop_hive",
default_args=default_args,
tags=['sample', 'hive', 'sqoop'],
template_searchpath='/shared',
description="Using sqoop to pull data from RDBMS",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@daily"
) as dag:
try_sqoop_import = SSHOperator(
task_id='try_sqoop_import',
ssh_conn_id='sys-ssh-sqoop',
command="""kinit -kt keytab/airflow.keytab airflow/airflow.mea.or.th &&\
sqoop list-databases --connect jdbc:sqlserver://<ip>:<port> --username <xxxx> --password <yyyy>
"""
)

try_sqoop_import

ตัวอย่าง Output จาก sqoop

Airflow

Apache Spark

คุณสามารถใช้ Apache Spark บน Data Platform ได้โดยกำหนด conn_id='sys-spark-livy' ร่วมกับ LivyOperator แต่เนื่องจาก Official Source Code ของ ApacheLivyOperator ใช้ HTTPHook เป็นแกนหลักในการเชื่อมต่อกับ REST Services ซึ่งปัจจุบันยังไม่สนับสนุนการ Authenticate ด้วย Kerberos หรือ SPNEGO คุณมีวิธีเชื่อมต่อ 2 วิธี

  • ใช้งาน Official LivyOperator และจัดการการเชื่อมต่อแบบ SPNEGO ด้วยตนเอง
  • ใช้งาน Custom ApacheLivyOperator ที่ ฝวท. พัฒนา เพื่อ Submit Spark Job ได้ทันที โดยให้ Import from mea.operators.livy import LivyOperator

Source Code ที่ ฝวท. แก้ไขใน HTTP Hook คือ (1) Detect การเชื่อมต่อแบบ Kerberos (2) ทำการเชื่อมต่อผ่าน Kerberos ด้วย Python Package requests_gssapi

# We added this line to detect Airflow security config
_kerberos_security_mode = conf.get("core", "security") == "kerberos"
def get_conn(self, headers: Optional[Dict[Any, Any]] = None) -> requests.Session:
"""
Returns http session for use with requests
:param headers: additional headers to be passed through as a dictionary
:type headers: dict
"""
session = requests.Session()

if self.http_conn_id:
conn = self.get_connection(self.http_conn_id)

if conn.host and "://" in conn.host:
self.base_url = conn.host
else:
# schema defaults to HTTP
schema = conn.schema if conn.schema else "http"
host = conn.host if conn.host else ""
self.base_url = schema + "://" + host

if conn.port:
self.base_url = self.base_url + ":" + str(conn.port)
if conn.login:
session.auth = self.auth_type(conn.login, conn.password)
# We added these two lines to build SPNEGO auth session
if _kerberos_security_mode:
session.auth = HTTPSPNEGOAuth()
if conn.extra:
try:
session.headers.update(conn.extra_dejson)
except TypeError:
self.log.warning('Connection to %s has invalid extra field.', conn.host)
if headers:
session.headers.update(headers)

return session

คุณสามารถดูตัวอย่างการใช้งานได้ที่ DAG SPARK

Docker

ในกรณีที่คุณมี Docker Image และต้องการนำมาประมวลผลบน Apache Airflow คุณสามารถนำมาใช้งานกับ DockerOperator ได้ทันทีโดยไม่ต้องระบุ docker_url โดยทั่วไปขั้นตอนในการใช้งานจะเป็นดังนี้

  • สร้าง Application ที่ต้องการ และ Build เป็น Docker Image
  • Push Docker Image ไปที่ Docker Hub
  • เขียน DAG โดยใช้ DockerOperator ระบุชื่อ Image ใน image

คุณสามารถดูตัวอย่างการใช้งาน Docker ได้ที่ DAG Docker

Kubernetes

ปัจจุบัน Apache Airflow Cluster ภายใน กฟน. ยังไม่สนับสนุน Kubernetes

Email Notification

คุณสามารถใช้ Feature Email Notification หรือ EmailOperator โดยไม่ต้อง Config เพิ่มเติม สามารถดูตัวอย่างได้ที่ DAG Email

MinIO Object Storage

MinIO เป็น Object Storage ที่สามารถใช้งานร่วมกับ Amazon S3 Operators ได้โดยกำหนด conn_id='sys-minio' โดยคุณสามารถ Access Bucket ได้ดังนี้

  • READ/WRITE สำหรับ Bucket airflow
  • READ สำหรับ Bucket public

ในกรณีที่คุณต้องการใช้งาน Features ที่ไม่มีใน Operators/Hooks คุณสามารถใช้ Python Package minio ร่วมกับ MinIO Client API และ PythonOperator ในการใช้ Access MinIO ในกรณีนี้คุณจะต้องกำหนด Access Credentials ดังนี้

from airflow.models import Variable
from minio import Minio

client = Minio(
Variable.get('MINIO_HOST'),
access_key=Variable.get('MINIO_SECRET_ACCESS_KEY'),
secret_key=Variable.get('MINIO_SECRET_KEY'),
secure=False,
http_client=urllib3.ProxyManager('http://meaproxy.mea.or.th:80')
)

ดูตัวอย่างการใช้งาน S3Hook และ S3KeySensor ได้ที่ DAG MinIO #1

ดูตัวอย่างการดาวโหลดไฟล์ผ่าน Client API ได้ที่ DAG MinIO #2

Apache HBase & Apache Phoenix

Module happybase ไม่รองรับ Secure Connection ทำให้ไม่สามารถเชื่อมต่อกับ Secure HBase Cluster ของ กฟน. ได้ ปัจจุบัน กฟน. สนับสนุนการใช้งาน HBase 2 รูปแบบได้แก่

  • HBase REST API ใช้งานเหมือน REST API ทั่วไป
  • Apache Phoenix ใช้งาน HBase ด้วยภาษา SQL

ตัวอย่างการใช้งาน HBest REST API ผ่าน SPNEGO

import base64
import json
import requests
from typing import Dict, List
from requests_gssapi import HTTPSPNEGOAuth


HBASE_REST_ENDPOINT = 'https://dpc-cdr-m1.mea.or.th:20550'
DEFAULT_HEADERS = {
'Accept': 'application/json'
}
DEFAULT_AUTH = HTTPSPNEGOAuth()

class HBaseAPI:
DEFAULT_HOST = 'https://dpc-cdr-m1.mea.or.th'
DEFAULT_PORT = 20550
DEFAULT_AUTH = HTTPSPNEGOAuth()

def __init__(self, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT, auth=DEFAULT_AUTH):
self._host = host
self._port = port
self._auth = auth
self._url = f'{self._host}:{self._port}'
self._session = requests.Session()
self._session.auth = self._auth


def _b64encode(self, s: str) -> str:
return base64.b64encode(s.encode('ascii')).decode('ascii')


def _b64decode(self, s: str) -> str:
return base64.b64decode(s.encode('ascii')).decode('ascii')


def get_cluster_version(self) -> str:
try:
response = self._session.get(f'{self._url}/version/cluster', headers={'Accept': 'application/json'})
response.raise_for_status()
except requests.exceptions.HTTPError as e:
raise SystemExit(e)

return response.json().get('Version')


def list_all_tables(self) -> str:
try:
response = self._session.get(f'{self._url}/', headers={'Accept': 'application/json'})
response.raise_for_status()
except requests.exceptions.HTTPError as e:
raise SystemExit(e)

return [table['name'] for table in response.json().get('table')]


def get_table_schema(self, name: str) -> Dict[str, str]:
try:
response = self._session.get(f'{self._url}/{name}/schema', headers={'Accept': 'application/json'})
response.raise_for_status()
except requests.exceptions.HTTPError as e:
raise SystemExit(e)

return response.json()


def create_table(self, name: str, cfs: List[str]) -> None:
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}

data = {
'@name': name,
'ColumnSchema': [{"name": cf} for cf in cfs]
}

try:
response = self._session.put(f'{self._url}/{name}/schema', headers=headers, data=json.dumps(data))
except requests.exceptions.HTTPError as e:
raise SystemExit(e)


def insert_data(self, name: str, data) -> None:
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}

try:
response = self._session.put(f'{self._url}/{name}/false-row-key', headers=headers, data=json.dumps(data))
response.raise_for_status()
except requests.exceptions.HTTPError as e:
raise SystemExit(e)


def get_row(self, name: str, rowkey: str) -> List[Dict[str, str]]:
try:
response = self._session.get(f'{self._url}/{name}/{rowkey}', headers={'Accept': 'application/json'})
except requests.exceptions.HTTPError as e:
raise SystemExit(e)

# Parse row
data = response.json()
return [{'column': self._b64decode(cell['column']),
'timestamp': cell['timestamp'],
'$': self._b64decode(cell['$'])}
for cell in data.get('Row')[0].get('Cell')]


def b64encode(s: str) -> str:
return base64.b64encode(s.encode('ascii')).decode('ascii')


if __name__=='__main__':
### Instantiate a simple HBaseAPI class
hbase = HBaseAPI()

### Example: Get HBase version
print(f'HBase version: {hbase.get_cluster_version()}')

### Example: Create table with column families
hbase.create_table('users_api', cfs=['personal', 'office'])

### Example: Insert data
data = {
"Row": [
{
"key": b64encode('1'),
"Cell": [
{"column": b64encode('personal:name'), "$": b64encode('Noctic Lucis Caelum')},
{"column": b64encode('personal:weapon'), "$": b64encode('Royal Arms')},
{"column": b64encode('office:city'), "$": b64encode('Insomnia')},
{"column": b64encode('office:transport'), "$": b64encode('Regalia Type-F')}
]
},
{
"key": b64encode('2'),
"Cell": [
{"column": b64encode('personal:name'), "$": b64encode('Jill Valentine')},
{"column": b64encode('personal:weapon'), "$": b64encode('Baretta')},
{"column": b64encode('office:city'), "$": b64encode('Raccoon City')},
{"column": b64encode('office:transport'), "$": b64encode('RPD Car')}
]
}
]
}
hbase.insert_data(name='users_api', data=data)

### Example: Query a single row
data = hbase.get_row(name='users_api', rowkey='1')
for cell in data:
print(cell)

ตัวอย่างวิธีการใช้งาน Apache Phoenix ผ่าน SPNEGO

import phoenixdb
import phoenixdb.cursor
from textwrap import dedent
from requests_gssapi import HTTPSPNEGOAuth

# Connect to Apache Phoenix using Kerberos
# If you're running this on MEA Airflow, you don't have to worry about kinit.
database_url = 'https://dpc-cdr-u2.mea.or.th:8765/'

drop_table_query = dedent('''\
DROP TABLE IF EXISTS "users_phoenix"
''')

create_table_query = dedent('''\
CREATE TABLE IF NOT EXISTS "users_phoenix" (
"id" INTEGER NOT NULL PRIMARY KEY,
"personal"."name" VARCHAR,
"personal"."weapon" VARCHAR,
"office"."city" VARCHAR,
"office"."transport" VARCHAR
)
''')

upsert_table_query = dedent('''\
UPSERT INTO "users_phoenix" ("id", "personal"."name", "personal"."weapon", "office"."city", "office"."transport")
VALUES (?, ?, ?, ?, ?)
''')

data_query = dedent('''\
SELECT * FROM "users_phoenix"
''')

with phoenixdb.connect(database_url, autocommit=True, auth=HTTPSPNEGOAuth()) as connection:
with connection.cursor() as cursor:
cursor.execute(drop_table_query)
cursor.execute(create_table_query)
cursor.executemany(upsert_table_query,
[
[1, 'Noctis Lucis Caelum', 'Royal Arms', 'Insomnia', 'Regalia Type-F'],
[2, 'Jill Valentine', 'Baretta', 'Raccoon City', 'RPD Car']
])
cursor.execute(data_query)
for row in cursor:
print(row)