مقدمه
در درس قبل، ما یک وب سرور ساده ساختیم که به درخواستها به صورت ترتیبی پاسخ میداد. این مدل یک
محدودیت بزرگ دارد: اگر یک درخواست زمانبر باشد، تمام درخواستهای بعدی باید منتظر بمانند تا آن
درخواست تمام شود. این مشکل سرور ما را در دنیای واقعی غیرقابل استفاده میکند.
یک راه حل ساده این است که برای هر درخواست ورودی، یک ترِد جدید با thread::spawn ایجاد کنیم. این روش کار میکند، اما میتواند منجر به
مشکلات عملکردی شود. ایجاد بیرویه ترِدها میتواند منابع سیستم را مصرف کرده و حتی باعث کند شدن
سرور شود. یک راه حل بهتر و استانداردتر، استفاده از یک «استخر ترِد» یا Thread Pool است.
پیادهسازی یک Thread Pool
یک Thread Pool مجموعهای از ترِدهای از پیش ایجاد شده است که در حالت آمادهباش قرار دارند. وقتی
یک کار جدید (در اینجا، یک درخواست ورودی) از راه میرسد، به جای ایجاد یک ترِد جدید، کار به یکی از
ترِدهای آزاد در استخر داده میشود تا آن را انجام دهد. این کار از هزینه سربار ایجاد و از بین بردن
مداوم ترِدها جلوگیری میکند.
ما یک struct جدید به نام ThreadPool در فایل src/lib.rs خواهیم ساخت. این struct
مسئول ایجاد و مدیریت ترِدها و توزیع کار بین آنها خواهد بود.
src/lib.rs
use std::{sync::{mpsc, Arc, Mutex}, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker { id, thread }
}
}
این کد کمی پیچیده است، اما بیایید آن را تجزیه کنیم. ThreadPool یک وکتور از Workerها و یک
فرستنده کانال (sender) را نگه میدارد. هر Worker یک ترِد است که در یک حلقه بینهایت
منتظر دریافت یک «کار» (Job) از کانال است. یک Job در اینجا یک کلوژر است که قرار است اجرا
شود. وقتی ما متد thread_pool.execute() را فراخوانی میکنیم، کلوژر
داده شده را از طریق کانال به یکی از Workerهای آزاد ارسال میکنیم. ما از Arc<Mutex<T>> برای
به اشتراکگذاری ایمن گیرنده کانال (receiver) بین تمام Workerها استفاده کردهایم.
استفاده از Thread Pool در سرور
اکنون میتوانیم از ThreadPool خود در تابع main استفاده کنیم تا هر اتصال ورودی را در یک
ترِد جداگانه مدیریت کنیم.
src/main.rs
use std::net::TcpListener;
use hello::ThreadPool;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
اکنون، به جای اینکه handle_connection را مستقیماً در حلقه اصلی فراخوانی کنیم، ما یک کلوژر که
handle_connection را فراخوانی میکند به ThreadPool خود میدهیم. استخر ترِد این کار را به یکی
از ترِدهای آزاد خود محول میکند و ترِد اصلی بلافاصله آزاد میشود تا به درخواست بعدی گوش دهد. این
کار به سرور ما اجازه میدهد تا چندین درخواست را به صورت همزمان پردازش کند.
نتیجهگیری
در این درس، با تبدیل سرور تک-ریسمانی خود به یک سرور چند-ریسمانی با استفاده از الگوی Thread
Pool، عملکرد آن را به شکل چشمگیری بهبود دادیم. این رویکرد به ما اجازه میدهد تا از هستههای
پردازشی متعدد به صورت بهینه استفاده کرده و به چندین کلاینت به صورت همزمان سرویس دهیم.
با این حال، سرور ما هنوز یک مشکل دارد: راهی برای خاموش کردن آن به صورت کنترلشده (graceful
shutdown) وجود ندارد. در درس پایانی این فصل و این دوره، به «پیادهسازی Graceful Shutdown» خواهیم
پرداخت و یاد میگیریم که چگونه به سرور خود اجازه دهیم تا قبل از خاموش شدن، تمام کارهای در حال
انجام را به پایان برساند.