RxJS Operators คือฟังก์ชันสำหรับแปลง กรอง รวม หรือจัดการข้อผิดพลาดของ Observable โดยใช้งานผ่าน pipe() เพื่อประกอบขั้นตอนการประมวลผลข้อมูลแบบ Stream
%%{init: {"theme": "base", "themeVariables": {"primaryColor": "#fabd2f", "primaryTextColor": "#282828", "primaryBorderColor": "#b57614", "lineColor": "#7c6f64", "secondaryColor": "#83a598", "tertiaryColor": "#b8bb26", "background": "#fbf1c7", "mainBkg": "#ebdbb2", "fontFamily": "Tahoma, sans-serif"}}}%%
flowchart LR
subgraph Era1["ยุค Array Methods / Synchronous Data"]
A["map/filter/reduce
ข้อมูลใน Array"]
end
subgraph Era2["ยุค Stream Operators / Async Stream"]
B["pipe()
ต่อ operator"]
C["map/filter/tap
แปลงและตรวจค่า"]
end
subgraph Era3["ยุค API Search / Request Control"]
D["debounceTime
ลด request"]
E["switchMap
ยกเลิก request เก่า"]
end
A --> B --> C --> D --> E
pipe() ใช้รวม Operators หลายตัวต่อกันmap(value => transform) แปลงค่าของ Observablefilter(value => condition) กรองค่าที่ผ่านเงื่อนไขtap(value => sideEffect) ดูค่าโดยไม่เปลี่ยนข้อมูลswitchMap() แปลงเป็น Observable ใหม่และยกเลิก Request เก่าmergeMap() ทำงานพร้อมกันหลาย StreamconcatMap() ทำงานตามลำดับทีละ StreamcatchError(err => of(fallback)) จัดการ Error ใน StreamdebounceTime() และ distinctUntilChanged() เหมาะกับ Search Input| Operator | การจัดการ Request | ลำดับผลลัพธ์ | เหมาะกับ |
|---|---|---|---|
switchMap |
ยกเลิกของเก่า | เอาล่าสุด | Search, autocomplete |
mergeMap |
ทำพร้อมกัน | ไม่รับประกันลำดับ | งาน parallel |
concatMap |
ต่อคิวทีละงาน | รักษาลำดับ | Save หลายขั้น |
exhaustMap |
เมินงานใหม่จนงานเก่าเสร็จ | กันกดซ้ำ | Login/Submit |
%%{init: {"theme": "base", "themeVariables": {"primaryColor": "#fabd2f", "primaryTextColor": "#282828", "primaryBorderColor": "#b57614", "lineColor": "#7c6f64", "secondaryColor": "#83a598", "tertiaryColor": "#b8bb26", "background": "#fbf1c7", "mainBkg": "#ebdbb2", "fontFamily": "Tahoma, sans-serif"}}}%%
flowchart LR
A["Search Input
คำค้น"] --> B["debounceTime
รอพิมพ์หยุด"]
B --> C["distinctUntilChanged
กันค่าซ้ำ"]
C --> D["switchMap
เรียก API ล่าสุด"]
D --> E["catchError
จัดการ Error"]
E --> F["Result List
แสดงผล"]
debounceTime และ distinctUntilChanged// search.component.ts
// ตัวอย่าง search input ด้วย RxJS operators
import { Component } from '@angular/core';
import { FormControl } from '@angular/forms';
import { catchError, debounceTime, distinctUntilChanged, of, switchMap, tap } from 'rxjs';
import { UserApiService, User } from './user-api.service';
@Component({
selector: 'app-search',
template: `
<input [formControl]="searchControl" placeholder="ค้นหาผู้ใช้">
<ul>
<li *ngFor="let user of users">{{ user.name }}</li>
</ul>
`
})
export class SearchComponent {
searchControl = new FormControl('');
users: User[] = [];
constructor(private userApi: UserApiService) {
this.searchControl.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
tap(keyword => console.log('search:', keyword)),
switchMap(keyword => this.userApi.searchUsers(keyword ?? '').pipe(
catchError(() => of([]))
))
).subscribe(result => {
this.users = result;
});
}
}
// ตัวอย่างการใช้งาน:
// เมื่อผู้ใช้พิมพ์ ระบบจะรอ 300ms แล้วเรียก API ล่าสุดเท่านั้น
debounceTime และ distinctUntilChangedswitchMapcatchError