สร้าง ETL Pipeline ด้วย n8n: ดึง แปลง โหลดข้อมูล
วิธีออกแบบและสร้าง ETL Pipeline ด้วย n8n ตั้งแต่ Extract ข้อมูล Transform และ Load ไปยังปลายทาง พร้อม Pattern และ Best Practice
ETL ย่อมาจาก Extract, Transform, Load ซึ่งเป็นกระบวนการย้ายข้อมูลจากที่หนึ่งไปอีกที่หนึ่งโดยผ่านการแปลงให้อยู่ในรูปแบบที่ต้องการ ในอดีต ETL ต้องอาศัยทีม Data Engineer และ Tool เฉพาะทาง แต่ด้วย n8n ทีมที่ไม่มีพื้นหลัง Data Engineering ก็สามารถสร้าง Pipeline ที่ใช้งานได้จริงได้ภายในเวลาไม่กี่ชั่วโมง
ก่อนจะสร้าง Pipeline ต้องเข้าใจก่อนว่าแต่ละขั้นตอนของ ETL ทำอะไร:
Extract (ดึง): ดึงข้อมูลจากแหล่งต้นทาง อาจเป็น Database, API, File หรือ Spreadsheet ขั้นตอนนี้ต้องจัดการ Pagination, Authentication และ Rate Limit ของแหล่งข้อมูล
Transform (แปลง): ทำความสะอาดข้อมูล เปลี่ยน Format, Validate ค่า, เติมข้อมูลที่ขาดหาย, รวม Field หรือคำนวณค่าใหม่ ขั้นตอนนี้คือหัวใจของ ETL
Load (โหลด): เขียนข้อมูลที่ผ่านการแปลงแล้วลงปลายทาง อาจเป็น Database อื่น, Data Warehouse, Spreadsheet หรือ API
สถาปัตยกรรม ETL Pipeline ใน n8n
Pipeline ที่ดีควรแบ่งออกเป็น Workflow ย่อยๆ ไม่ควยยัดทุกอย่างใน Workflow เดียว
แนวทางที่แนะนำ:
- 1 Workflow = 1 Source ถ้าดึงจาก 3 ระบบ ให้มี 3 Workflow แยกกัน ง่ายต่อการ Debug และ Maintain
- แยก Transform ออกมา ถ้า Transform Logic ซับซ้อน ให้แยกเป็น Sub-workflow ที่เรียกได้จาก Workflow อื่น
- Log ทุก Error ให้มี Error Handler ที่บันทึกข้อผิดพลาดลง Database หรือส่งแจ้งเตือน
Phase 1: Extract — ดึงข้อมูล
กรณีที่ 1: ดึงจาก Database
ใช้ MySQL, PostgreSQL หรือ Supabase Node เพื่อดึงข้อมูล ควร Query เฉพาะข้อมูลที่เปลี่ยนแปลงหลังจาก Last Run เพื่อประหยัดเวลาและทรัพยากร
SELECT * FROM orders
WHERE updated_at > '{{ $vars.last_run_timestamp }}'
ORDER BY updated_at ASC
LIMIT 1000
กรณีที่ 2: ดึงจาก API พร้อม Pagination
API ส่วนใหญ่แบ่งผลลัพธ์เป็นหน้า n8n รองรับ Pagination ใน HTTP Request Node โดยตรง:
- ใช้ HTTP Request Node
- เปิด Pagination Option
- เลือก Pagination Mode: Response Contains Next URL หรือ Offset/Page
- n8n จะดึงข้อมูลทุกหน้าอัตโนมัติจนกว่าจะหมด
กรณีที่ 3: ดึงจาก File (CSV, JSON)
ใช้ Read/Write Files from Disk Node หรือ FTP Node เพื่อดึงไฟล์ จากนั้นใช้ Spreadsheet File Node หรือ Code Node เพื่อ Parse
Phase 2: Transform — แปลงข้อมูล
Transformation ที่พบบ่อย:
Data Cleaning — ทำความสะอาด
const items = $input.all();
return items.map(item => {
const d = item.json;
return {
json: {
id: d.id,
name: (d.name || '').trim(),
email: (d.email || '').toLowerCase().trim(),
phone: d.phone ? d.phone.replace(/[^0-9+]/g, '') : null,
// เติมค่า Default ถ้า null
status: d.status ?? 'unknown',
}
};
});
Data Enrichment — เติมข้อมูลเพิ่มเติม
บางครั้งต้องดึงข้อมูลเพิ่มจากแหล่งอื่นเพื่อเติมให้ครบ เช่น แปลง Province Code เป็นชื่อจังหวัด
const provinceMap = {
'BKK': 'กรุงเทพมหานคร',
'CNX': 'เชียงใหม่',
'KKC': 'ขอนแก่น',
'HYI': 'หาดใหญ่',
};
const items = $input.all();
return items.map(item => ({
json: {
...item.json,
province_name: provinceMap[item.json.province_code] ?? item.json.province_code
}
}));
Deduplication — กรอง Duplicate ออก
ก่อน Load ควรตรวจสอบ Duplicate โดยใช้ Field ที่ Unique เช่น email หรือ order_id
const items = $input.all();
const seen = new Set();
const unique = [];
for (const item of items) {
const key = item.json.email; // หรือ field ที่ unique
if (!seen.has(key)) {
seen.add(key);
unique.push(item);
}
}
return unique;
Data Type Casting — แปลง Type
return $input.all().map(item => ({
json: {
...item.json,
price: parseFloat(item.json.price) || 0,
quantity: parseInt(item.json.quantity) || 0,
is_active: item.json.is_active === 'true' || item.json.is_active === true,
created_at: new Date(item.json.created_at).toISOString(),
}
}));
Phase 3: Load — โหลดข้อมูลลงปลายทาง
Strategy 1: Insert Only (Append)
เหมาะสำหรับ Log Table หรือข้อมูลที่ไม่เคย Update เช่น Transaction Records
INSERT INTO data_warehouse.transactions
(id, amount, user_id, created_at)
VALUES ({{ $json.id }}, {{ $json.amount }}, {{ $json.user_id }}, '{{ $json.created_at }}')
Strategy 2: Upsert (Insert หรือ Update)
เหมาะสำหรับ Sync ข้อมูล Master เช่น Users, Products ที่อาจมีการแก้ไข
INSERT INTO users (id, name, email, updated_at)
VALUES ({{ $json.id }}, '{{ $json.name }}', '{{ $json.email }}', NOW())
ON DUPLICATE KEY UPDATE
name = VALUES(name),
email = VALUES(email),
updated_at = VALUES(updated_at)
Strategy 3: Truncate and Reload
ลบข้อมูลเก่าแล้ว Insert ใหม่ทั้งหมด เหมาะสำหรับตารางขนาดเล็กที่ต้องการความแน่ใจว่าข้อมูลตรงกับ Source เสมอ
ระวัง: วิธีนี้ทำให้ข้อมูลหายชั่วคราวระหว่าง Truncate และ Insert ไม่เหมาะกับระบบที่มีการ Query ตลอดเวลา
จัดการ Error ใน ETL Pipeline
ETL Pipeline ต้องรับมือกับ Error อย่างชาญฉลาด เพราะถ้า Workflow หยุดกลางคันอาจทำให้ข้อมูลไม่สมบูรณ์
วิธีจัดการ Error ใน n8n:
- เปิด Continue On Fail ใน Node ที่อาจล้มเหลว เช่น HTTP Request หรือ Database Write
- ใช้ IF Node ตรวจสอบว่า Error เกิดขึ้นหรือไม่
- Log Error ลง Database หรือส่งแจ้งเตือนไป LINE/Slack
- Retry เฉพาะ Item ที่ล้มเหลวใน Round ถัดไป
ตัวอย่าง Error Handling Pattern:
[HTTP Request] → [IF: มี error?]
↓ Yes ↓ No
[Log to DB] [Process Data]
[Send Alert]
Tracking Last Run
เพื่อให้ Pipeline ดึงเฉพาะข้อมูลที่ใหม่กว่าครั้งที่แล้ว ต้องบันทึก Timestamp ของ Last Run ไว้
วิธีที่ 1: ใช้ n8n Variables
บันทึก last_run ลงใน n8n Variables แล้วอ่านในครั้งถัดไป
วิธีที่ 2: ใช้ State Table ใน Database
สร้างตาราง pipeline_state ที่เก็บ pipeline_name, last_run_at, last_processed_id แล้ว Update หลัง Load สำเร็จ
วิธีที่ 3: ใช้ Workflow ID + Static Data
n8n รองรับ Static Data ที่เก็บไว้ระหว่าง Execution ผ่าน $getWorkflowStaticData('global')
ตัวอย่าง Pipeline จริง: Sync ข้อมูลออร์เดอร์
โจทย์: ระบบขายออนไลน์เก็บข้อมูลใน MySQL, ต้องการ Sync ทุกชั่วโมงไป Google Sheets เพื่อให้ทีม Management ดูรายงาน
Workflow:
- Schedule Trigger — ทุกชั่วโมง
- MySQL Node — ดึง Orders ที่ updated_at > Last Run
- Code Node — แปลง Format วันที่ คำนวณยอดรวม เติมชื่อสถานะภาษาไทย
- Google Sheets Node — Append Rows ลง Sheet
- MySQL Node — Update Last Run Timestamp ใน State Table
- IF Node (Error Branch) — ถ้ามี Error ส่ง LINE แจ้งเตือน
Pipeline นี้ใช้เวลาสร้างไม่เกิน 1-2 ชั่วโมง แต่ให้ประโยชน์ทุกวันโดยที่ไม่ต้อง Export ด้วยมือ
อยากเรียน n8n แบบเป็นระบบ ตั้งแต่เริ่มต้นจนสร้าง Workflow ใช้งานจริงได้ ลองดู คอร์สสอน n8n ที่ aiunlock.co
Related posts
สร้างระบบ RAG ด้วย n8n: AI ตอบจากข้อมูลของคุณ
วิธีสร้างระบบ RAG (Retrieval-Augmented Generation) ด้วย n8n ให้ AI ตอบจากเอกสาร ฐานข้อมูล หรือ knowledge base ของคุณ
เชื่อมต่อ Vector Database กับ n8n: Pinecone, Qdrant, Supabase
เปรียบเทียบและวิธีเชื่อมต่อ Vector Database กับ n8n ครบทุกตัว ทั้ง Pinecone, Qdrant, Supabase pgvector และ Chroma
n8n Advanced: 10 เทคนิคขั้นสูงสำหรับมืออาชีพ
รวม 10 เทคนิค n8n ขั้นสูงที่มืออาชีพใช้จริง ตั้งแต่ Error Handling, Sub-Workflow, Code Node ไปจนถึง API Pagination