Skip to main content

Data Pipeline with Apache Airflow

Intouch Profile
Intouch Sangkrajang
Author & Data Engineer

ขั้นตอนการสร้าง Data Pipeline สำหรับดึงข้อมูลจาก API มาจัดเก็บภายใน Data platform

ในคู่มือฉบับนี้จะยกตัวอย่างการนำเข้าข้อมูล CRM Data จาก API มาจัดเก็บใน Data platform โดยใช้ Apache Impala ในการบริหารข้อมูล

สำหรับวิธีการพัฒนา Data pipeline ด้วย Apache Airflow ผู้ใช้งานสามารถศึกษาได้จาก คู่มือ Apache Airflow

ทดลองเชื่อมต่อ API

  1. ศึกษาคู่มือการใช้งาน API ของข้อมูลที่ต้องการนำเข้า เช่น URL, วิธีการส่ง request (HTTP GET, HTTP POST และอื่นๆ), ลักษณะการรับ parameter ของ API, ลักษณะ respond ที่ API ส่งกลับมา เป็นต้น ตัวอย่างเช่น CRM API ใช้ REST API ผ่านทาง HTTP POST โดยรับ parameter เป็น JSON text และส่งข้อมูลกลับมาเป็น JSON text

  2. ก่อนทำการเขียนโค้ด สามารถทดสอบการดึงข้อมูลผ่าน API ผ่านโปรแกรม Postman ได้ โดยกำหนด request method, request parameter ตามที่คู่มือกำหนด

    Postman CRM

พัฒนา DAG

  1. ทำการสร้างไฟล์ DAG และกำหนด parameter ต่างๆ ตามต้องการ

    default_args = {
    'owner': 'DPD',
    'retries': 3,
    'retry_delay': dt.timedelta(minutes=5),
    'email': ['your.email@mea.or.th'],
    'email_on_failure': False,
    'email_on_retry': False,
    }

    with DAG(
    dag_id = "DPD_Fetch_CRM_API_Data",
    default_args = default_args,
    description = "Fetch CRM daily data into TDV via API",
    schedule_interval='@daily',
    start_date = airflow.utils.dates.days_ago(1),
    ) as dag:
  2. สร้างฟังก์ชั่นสำหรับ request ข้อมูล api และเซฟเป็นไฟล์ csv โดย clean ข้อมูลและจัด format ให้เรียบร้อย python สามารถใช้ library requests ในการดึงข้อมูลผ่าน HTTP POST ได้ และใช้ library csv ในการแปลง python dictionary เป็นไฟล์ในรูปแบบ csv ได้

    def _get_data_from_api(
    url: str,
    start_date: str,
    end_date: str,
    outfile: str
    ):
    headers = {
    'Content-Type': 'application/json'
    }
    params = {
    'Value1': '',
    'Value2': '',
    ...
    }
    res = requests.post(url, headers=headers, data = json.dumps(params))
    if res.status_code == 200:
    data = res.json()
    #error handling
    ...
    #cleaning data
    ...
    output_dir = os.path.dirname(outfile)
    os.makedirs(output_dir, exist_ok=True)
    csv_columns = ['Account_name','Account_title', ...]
    with open(outfile, 'w') as f:
    writer = csv.DictWriter(f, fieldnames=csv_columns)
    writer.writeheader()
    for row in data['Result']:
    writer.writerow(row)
    else:
    #raise http error
  3. สร้าง task สำหรับดึงข้อมูลจาก api โดยใช้ function ที่เขียนไว้

    get_data_from_api = PythonOperator(
    task_id="get_data_from_api",
    python_callable=_get_data_from_api,
    op_kwargs = {
    "url": 'http://cc-ai-rb1.meacallcenter.mea.or.th:10310/IntegrationControl/GetTicket',
    "start_date": '{{ macros.ds_add(ds, -1) }}',
    "end_date": '{{ ds }}',
    "outfile": '/shared/crm-api/{{ ds }}-crm-tickets.csv',
    }
    )
  4. สร้าง task สำหรับสร้างตารางใน Apache Impala โดยใช้ JdbcOperator

    create_crm_table = JdbcOperator(
    task_id='create_crm_table',
    jdbc_conn_id='sys-jdbc-impala',
    sql="""
    CREATE EXTERNAL TABLE IF NOT EXISTS crm_data (
    Account_name STRING,
    Account_title STRING,
    ...
    )
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    STORED AS TEXTFILE
    LOCATION '/user/airflow/crm-data'
    TBLPROPERTIES ('skip.header.line.count'='1')
    """
    )
  5. สร้างฟังก์ชั่นสำหรับอัพโหลดไฟล์ csv จาก airflow เข้า HDFS โดยใช้ WebHDFSHook

    def _upload_file(
    infile: str
    ):
    webhdfs = WebHDFSHook(webhdfs_conn_id='sys-hdfs')
    webhdfs.load_file(
    source=infile,
    destination='/user/airflow/crm-data/',
    overwrite='true'
    )
  6. สร้าง task สำหรับอัพโหลดไฟล์เข้า HDFS โดยใช้ function ที่เขียนไว้

    upload_file_to_hdfs = PythonOperator(
    task_id='upload_file_to_hdfs',
    python_callable=_upload_file,
    op_kwargs = {
    'infile': '/shared/crm-api/{{ ds }}-crm-tickets.csv'
    }
    )
  7. สร้าง task ตรวจสอบว่าข้อมูลได้อัพโหลดขึ้นไปเรียบร้อยแล้วหรือไม่ โดยใช้ WebHdfsSensor

    check_hdfs_upload = WebHdfsSensor(
    task_id='check_hdfs_upload',
    webhdfs_conn_id='sys-hdfs',
    filepath='/user/airflow/crm-data/{{ ds }}-crm-tickets.csv'
    )
  8. สร้าง task สำหรับอัพเดทข้อมูลตารางใน Apache Impala

    refresh_data = JdbcOperator(
    task_id='refresh_data',
    jdbc_conn_id='sys-jdbc-impala',
    sql="""
    INVALIDATE METADATA airflow.crm_data
    """
    )
  9. เชื่อม task ต่างๆ เข้าด้วยกัน

    create_crm_table >> get_data_from_api >> upload_file_to_hdfs >> check_hdfs_upload >> refresh_data

ตรวจสอบข้อมูลใน Hue

  1. ตรวจสอบข้อมูลใน Apache Impala ผ่าน Cloudera Hue โดยสามารถศึกษาวิธีการเข้าใช้งานได้ใน คู่มือ Cloudera HueHue

ผู้ใช้งานสามารถดูตัวอย่างโค้ด DAG แบบเต็มได้ ที่นี่ (เข้าใช้งานได้ในเน็ตเวิร์ค กฟน. เท่านั้น)