n8n Thai
by n8n Thai

สร้าง ETL Pipeline ด้วย n8n: ดึง แปลง โหลดข้อมูล

วิธีออกแบบและสร้าง ETL Pipeline ด้วย n8n ตั้งแต่ Extract ข้อมูล Transform และ Load ไปยังปลายทาง พร้อม Pattern และ Best Practice

สร้าง ETL Pipeline ด้วย n8n: ดึง แปลง โหลดข้อมูล

ETL ย่อมาจาก Extract, Transform, Load ซึ่งเป็นกระบวนการย้ายข้อมูลจากที่หนึ่งไปอีกที่หนึ่งโดยผ่านการแปลงให้อยู่ในรูปแบบที่ต้องการ ในอดีต ETL ต้องอาศัยทีม Data Engineer และ Tool เฉพาะทาง แต่ด้วย n8n ทีมที่ไม่มีพื้นหลัง Data Engineering ก็สามารถสร้าง Pipeline ที่ใช้งานได้จริงได้ภายในเวลาไม่กี่ชั่วโมง

ก่อนจะสร้าง Pipeline ต้องเข้าใจก่อนว่าแต่ละขั้นตอนของ ETL ทำอะไร:

Extract (ดึง): ดึงข้อมูลจากแหล่งต้นทาง อาจเป็น Database, API, File หรือ Spreadsheet ขั้นตอนนี้ต้องจัดการ Pagination, Authentication และ Rate Limit ของแหล่งข้อมูล

Transform (แปลง): ทำความสะอาดข้อมูล เปลี่ยน Format, Validate ค่า, เติมข้อมูลที่ขาดหาย, รวม Field หรือคำนวณค่าใหม่ ขั้นตอนนี้คือหัวใจของ ETL

Load (โหลด): เขียนข้อมูลที่ผ่านการแปลงแล้วลงปลายทาง อาจเป็น Database อื่น, Data Warehouse, Spreadsheet หรือ API

สถาปัตยกรรม ETL Pipeline ใน n8n

Pipeline ที่ดีควรแบ่งออกเป็น Workflow ย่อยๆ ไม่ควยยัดทุกอย่างใน Workflow เดียว

แนวทางที่แนะนำ:

  • 1 Workflow = 1 Source ถ้าดึงจาก 3 ระบบ ให้มี 3 Workflow แยกกัน ง่ายต่อการ Debug และ Maintain
  • แยก Transform ออกมา ถ้า Transform Logic ซับซ้อน ให้แยกเป็น Sub-workflow ที่เรียกได้จาก Workflow อื่น
  • Log ทุก Error ให้มี Error Handler ที่บันทึกข้อผิดพลาดลง Database หรือส่งแจ้งเตือน

Phase 1: Extract — ดึงข้อมูล

กรณีที่ 1: ดึงจาก Database

ใช้ MySQL, PostgreSQL หรือ Supabase Node เพื่อดึงข้อมูล ควร Query เฉพาะข้อมูลที่เปลี่ยนแปลงหลังจาก Last Run เพื่อประหยัดเวลาและทรัพยากร

SELECT * FROM orders
WHERE updated_at > '{{ $vars.last_run_timestamp }}'
ORDER BY updated_at ASC
LIMIT 1000

กรณีที่ 2: ดึงจาก API พร้อม Pagination

API ส่วนใหญ่แบ่งผลลัพธ์เป็นหน้า n8n รองรับ Pagination ใน HTTP Request Node โดยตรง:

  1. ใช้ HTTP Request Node
  2. เปิด Pagination Option
  3. เลือก Pagination Mode: Response Contains Next URL หรือ Offset/Page
  4. n8n จะดึงข้อมูลทุกหน้าอัตโนมัติจนกว่าจะหมด

กรณีที่ 3: ดึงจาก File (CSV, JSON)

ใช้ Read/Write Files from Disk Node หรือ FTP Node เพื่อดึงไฟล์ จากนั้นใช้ Spreadsheet File Node หรือ Code Node เพื่อ Parse

Phase 2: Transform — แปลงข้อมูล

Transformation ที่พบบ่อย:

Data Cleaning — ทำความสะอาด

const items = $input.all();
return items.map(item => {
  const d = item.json;
  return {
    json: {
      id: d.id,
      name: (d.name || '').trim(),
      email: (d.email || '').toLowerCase().trim(),
      phone: d.phone ? d.phone.replace(/[^0-9+]/g, '') : null,
      // เติมค่า Default ถ้า null
      status: d.status ?? 'unknown',
    }
  };
});

Data Enrichment — เติมข้อมูลเพิ่มเติม

บางครั้งต้องดึงข้อมูลเพิ่มจากแหล่งอื่นเพื่อเติมให้ครบ เช่น แปลง Province Code เป็นชื่อจังหวัด

const provinceMap = {
  'BKK': 'กรุงเทพมหานคร',
  'CNX': 'เชียงใหม่',
  'KKC': 'ขอนแก่น',
  'HYI': 'หาดใหญ่',
};

const items = $input.all();
return items.map(item => ({
  json: {
    ...item.json,
    province_name: provinceMap[item.json.province_code] ?? item.json.province_code
  }
}));

Deduplication — กรอง Duplicate ออก

ก่อน Load ควรตรวจสอบ Duplicate โดยใช้ Field ที่ Unique เช่น email หรือ order_id

const items = $input.all();
const seen = new Set();
const unique = [];

for (const item of items) {
  const key = item.json.email; // หรือ field ที่ unique
  if (!seen.has(key)) {
    seen.add(key);
    unique.push(item);
  }
}

return unique;

Data Type Casting — แปลง Type

return $input.all().map(item => ({
  json: {
    ...item.json,
    price: parseFloat(item.json.price) || 0,
    quantity: parseInt(item.json.quantity) || 0,
    is_active: item.json.is_active === 'true' || item.json.is_active === true,
    created_at: new Date(item.json.created_at).toISOString(),
  }
}));

Phase 3: Load — โหลดข้อมูลลงปลายทาง

Strategy 1: Insert Only (Append)

เหมาะสำหรับ Log Table หรือข้อมูลที่ไม่เคย Update เช่น Transaction Records

INSERT INTO data_warehouse.transactions
(id, amount, user_id, created_at)
VALUES ({{ $json.id }}, {{ $json.amount }}, {{ $json.user_id }}, '{{ $json.created_at }}')

Strategy 2: Upsert (Insert หรือ Update)

เหมาะสำหรับ Sync ข้อมูล Master เช่น Users, Products ที่อาจมีการแก้ไข

INSERT INTO users (id, name, email, updated_at)
VALUES ({{ $json.id }}, '{{ $json.name }}', '{{ $json.email }}', NOW())
ON DUPLICATE KEY UPDATE
  name = VALUES(name),
  email = VALUES(email),
  updated_at = VALUES(updated_at)

Strategy 3: Truncate and Reload

ลบข้อมูลเก่าแล้ว Insert ใหม่ทั้งหมด เหมาะสำหรับตารางขนาดเล็กที่ต้องการความแน่ใจว่าข้อมูลตรงกับ Source เสมอ

ระวัง: วิธีนี้ทำให้ข้อมูลหายชั่วคราวระหว่าง Truncate และ Insert ไม่เหมาะกับระบบที่มีการ Query ตลอดเวลา

จัดการ Error ใน ETL Pipeline

ETL Pipeline ต้องรับมือกับ Error อย่างชาญฉลาด เพราะถ้า Workflow หยุดกลางคันอาจทำให้ข้อมูลไม่สมบูรณ์

วิธีจัดการ Error ใน n8n:

  1. เปิด Continue On Fail ใน Node ที่อาจล้มเหลว เช่น HTTP Request หรือ Database Write
  2. ใช้ IF Node ตรวจสอบว่า Error เกิดขึ้นหรือไม่
  3. Log Error ลง Database หรือส่งแจ้งเตือนไป LINE/Slack
  4. Retry เฉพาะ Item ที่ล้มเหลวใน Round ถัดไป

ตัวอย่าง Error Handling Pattern:

[HTTP Request] → [IF: มี error?]
                     ↓ Yes          ↓ No
              [Log to DB]      [Process Data]
              [Send Alert]

Tracking Last Run

เพื่อให้ Pipeline ดึงเฉพาะข้อมูลที่ใหม่กว่าครั้งที่แล้ว ต้องบันทึก Timestamp ของ Last Run ไว้

วิธีที่ 1: ใช้ n8n Variables บันทึก last_run ลงใน n8n Variables แล้วอ่านในครั้งถัดไป

วิธีที่ 2: ใช้ State Table ใน Database สร้างตาราง pipeline_state ที่เก็บ pipeline_name, last_run_at, last_processed_id แล้ว Update หลัง Load สำเร็จ

วิธีที่ 3: ใช้ Workflow ID + Static Data n8n รองรับ Static Data ที่เก็บไว้ระหว่าง Execution ผ่าน $getWorkflowStaticData('global')

ตัวอย่าง Pipeline จริง: Sync ข้อมูลออร์เดอร์

โจทย์: ระบบขายออนไลน์เก็บข้อมูลใน MySQL, ต้องการ Sync ทุกชั่วโมงไป Google Sheets เพื่อให้ทีม Management ดูรายงาน

Workflow:

  1. Schedule Trigger — ทุกชั่วโมง
  2. MySQL Node — ดึง Orders ที่ updated_at > Last Run
  3. Code Node — แปลง Format วันที่ คำนวณยอดรวม เติมชื่อสถานะภาษาไทย
  4. Google Sheets Node — Append Rows ลง Sheet
  5. MySQL Node — Update Last Run Timestamp ใน State Table
  6. IF Node (Error Branch) — ถ้ามี Error ส่ง LINE แจ้งเตือน

Pipeline นี้ใช้เวลาสร้างไม่เกิน 1-2 ชั่วโมง แต่ให้ประโยชน์ทุกวันโดยที่ไม่ต้อง Export ด้วยมือ

อยากเรียน n8n แบบเป็นระบบ ตั้งแต่เริ่มต้นจนสร้าง Workflow ใช้งานจริงได้ ลองดู คอร์สสอน n8n ที่ aiunlock.co

Related posts