Celery Là Gì

Trong dự án hiện nay của bản thân khi đến phần scaling khối hệ thống thì phong cách thiết kế hiện giờ theo phía microservice gặp mặt yêu cầu một vấn đề: đều service vào hệ thống số đông ảnh hưởng thẳng cùng với database đề xuất xẩy ra vụ việc càng những service thì sẽ càng các kết nối tới database dẫn cho triệu chứng xẩy ra deadloông chồng, performance cũng tương đối chậm rãi do các kết nối cho tới database từ các service bắt buộc đợi nhau giải phóng.

Bạn đang xem: Celery là gì

Sau khi được lưu ý về việc chuyển sang cần sử dụng hàng ngóng nuốm do nhằm những service thao tác làm việc trực tiếp với database, mình tất cả dành thời gian tham khảo thêm về bản vẽ xây dựng Queue. Do dự án chạy đa số bởi pyeo hẹp phải tech lead gợi ý thực hiện Celery, một khối hệ thống thống trị queue phổ biến.

Kiến trúc sau thời điểm chuyển quý phái sử dụng queue trong hệ thống của bản thân đang như sau. Một bài viết hơi chi tiết về một dạng thiết kế queue là message queue phần đông bạn rất có thể đọc thêm làm việc toidicodedao


*

Về CeleryLà một hệ thống quản lý sản phẩm hóng xử trí task thời hạn thực. Trong khối hệ thống Celery chúng ta đã áp dụng có mang task y như job sống một vài framework khác ví như Sidekiq.Input của celery cần liên kết với cùng một một số loại message broker còn output rất có thể liên kết cho tới một hệ thống backkết thúc nhằm lưu trữ kết quả

Mọi bạn rất có thể tham khảo một nội dung bài viết không giống về Celery trên realchampionshipwrestling.com tại đây. Dường như Celery cũng có một hệ thống document cụ thể cùng dễ đọc sinh hoạt Home https://docs.celeryproject.org/en/latest/getting-started/introduction.html.

Các bài toán đề nghị sử dụng CeleryChạy background jobsChạy các job lập lịchTính tân oán phân tánXử lý song songCác công dụng thiết yếu Celery cung cấpMonitor: đo lường những job/task được gửi vào queueScheduling: chạy các task lập lịch (tương đương cronjob)Workflows: tạo một luồng xử lý taskTime và Rate Limits: điều hành và kiểm soát con số task được triển khai trong một khoảng chừng thời gian, thời hạn một task được chạy,...Resource Leak Protection: kiểm soát điều hành tài nguyên ổn trong quy trình cách xử trí taskUser Component: chất nhận được người tiêu dùng trường đoản cú customize những worker.Cơ chế của CeleryCelery hoạt động dựa trên có mang task queue. Đây là chế độ queue dùng để điều phối hận các job/work thân những thứ khác biệt. Các worker vẫn nhận task, chạy task với trả về công dụng.Input của queue:TaskCác process trên từng worker đang theo dõi và quan sát queue để thực hiện những task mới được đẩy vào queueCelery hay được sử dụng một message broker để điều păn năn task giữa những clients và worker. Để chế tác một task new client sẽ thêm một message vào queue, broker tiếp nối sẽ chuyển message này cho tới worker. Celery cung ứng 3 một số loại broker:RabbitMQRedisSQSMột hệ thống áp dụng celery có thể có tương đối nhiều workers với brokers, dựa vào vậy Việc scale theo chiều ngang sẽ rất dễ dãi.Các module chủ yếu của Celery

Application

Một instance được khởi chế tác từ tlỗi viện Celery được hotline là application

phần lớn Celery application rất có thể cùng lâu dài vào một process

Khởi chế tạo một celery application:

from celery import Celerytiện ích = Celery()Khi gửi một message tới queue, message đó sẽ chỉ chứa thương hiệu của task phải triển khai.

Các celery worker sẽ maps giữa thương hiệu của task với hàm thực hiện task kia, vấn đề mapping như thế được call là task registry


ứng dụng.taskdef add(x, y):return x + y

Tasks

Task trong Celery bao gồm nhì trách nhiệm chính:có mang hầu như gì vẫn xẩy ra sau thời điểm một task được Gọi (gửi đi message)quan niệm phần nhiều gì đang xẩy ra lúc một worker nhận thấy message đóMỗi task tất cả một thương hiệu riêng rẽ ko giống nhau, tên này sẽ tiến hành refer trong message nhằm worker có thể kiếm được đúng hàm nhằm triển khai. Nếu ko định nghĩa tên cho task thì task đó sẽ tiến hành từ đánh tên nhờ vào module cơ mà task được khái niệm và tên function của task.Các message của task sẽ không biến thành xóa sổ queue chừng nào message kia không được một worker xử lý. Một worker rất có thể cách xử trí nhiều message, nếu worker bị crash nhưng mà chưa xử lý không còn những message đó thì chúng vẫn có thể được gửi lại tới một worker khácCác function của task đề xuất ngơi nghỉ tâm trạng idempotent: function không gây ra ảnh hưởng gì bao gồm cả khi tất cả bị Call các lần với một tmê mệt số => một task vẫn thực thi vẫn bảo vệ không biến thành chạy lại đợt tiếp nhữa.

Tạo task

Để tạo thành task chúng ta dùng decorator
phầm mềm.task(name="create_new_user")def create_user(username, password):User.objects.create(username=username, password=password)Để task rất có thể retry chúng ta cũng có thể bound task vào chủ yếu instance của nó


task(bind=True)def add(self, x, y):logger.info(self.request.id)Task cũng hoàn toàn có thể kế thừa

import celeryclass MyTask(celery.Task):def on_failure(self, exc, task_id, args, kwargs, einfo): print("0!r failed: 1!r".format(task_id, exc))
task(base=MyTask)def add(x, y):raise KeyError()Để hiểu thêm công bố với trạng thái của task chúng ta cũng có thể thực hiện Task.request


app.task(bind=True)def dump_context(self, x, y):print("Executing task id 0.id, args: 0.args!r kwargs: 0.kwargs!r".format( self.request))Celery làm chủ tâm lý của tasks cùng hoàn toàn có thể giữ bọn chúng trong số hệ thống call là result backkết thúc. Vòng đời khoác định của task vào Celery gồm:

PENDING: task đợi được xúc tiến.

Xem thêm: " Bé Xuân Mai Sinh Năm Bao Nhiêu, Xuân Mai (Ca Sĩ)

STARTED: task vẫn khởi chạy

SUCCESS: task sẽ chạy thành công

FAILURE: task gặp gỡ lỗi sau thời điểm khởi chạy

RETRY: task đang rất được chạy lại

REVOKED: task được thu hồi lại

Ngoài những tâm trạng mang định bên trên bạn cũng có thể từ định nghĩa thêm tâm lý với cập nhật trạng thái đến task bởi method update_state


phầm mềm.task(bind=True)def upload_files(self, filenames):for i, tệp tin in enumerate(filenames): if not self.request.called_directly: self.update_state(state="PROGRESS", meta="current": i, "total": len(filenames))

Call task

Celery cung cấp các API để Gọi task sau khi đã quan niệm bọn chúng làm việc bên trên.

3 method chính:

apply_async: gửi task message.delay: gửi task messagecalling: task message sẽ không còn được gửi đi tới worker mà task sẽ tiến hành tiến hành luôn vị process ngày nay.

Có một task nlỗi sau:


tiện ích.taskdef add(x, y):return x + yĐể Gọi task này họ đã thử dùng 2 method là apply_async và delay

Với delay chúng ta đang viết nlỗi sau:

# task.delay(arg1, arg2, kwarg1="x", kwarg2="y")add.delay(10, 5)add.delay(a=10, b=5)Dùng apply_async thì yêu cầu viết tinh vi rộng một chút# task.apply_async(args=, kwargs="kwarg1": "x", "kwarg2": "y")add.apply_async(queue="low_priority", args=(10, 5))add.apply_async(queue="high_priority", kwargs="a": 10, "b": 5)Về thực chất delay với apply_async là hệt nhau nhưng lại delay đã tất cả sẵn những tùy chỉnh cấu hình khoác định với chúng ta chỉ hoàn toàn có thể truyền vào mọi tsay đắm số nên vẫn có mang vào function của task, còn cùng với apply_async bạn có thể truyền thêm những tđắm đuối số khác như queue họ mong gửi message vào,.... Best practice là buộc phải sử dụng apply_async nhằm tiện vấn đề config chạy task tùy theo nhu cầu thực hiện.

Celery cung ứng bài toán điện thoại tư vấn task theo dạng chaining, tác dụng của task này hoàn toàn có thể được truyền vào task tiếp theo

add.apply_async((2, 2), link=add.s(16)) # 20Nhờ vào phép tắc này chúng ta cũng có thể kiến thiết callback đến task nlỗi sau
phầm mềm.taskdef error_handler(uuid):result = AsyncResult(uuid)exc = result.get(propagate=False)print("Task 0 raised exception: 1!r 2!r".format( uuid, exc, result.traceback))add.apply_async((2, 2), link_error=error_handler.s())Sử dụng Celery

Cài đặt

pip install -U Celery

Sử dụng

Lựa lựa chọn các loại message broker phù hợp cùng với dự án. Nhỏng vẫn nói trên Celery cung cấp 3 các loại message broker là RabbitMQ, Redis, SQS. Mình sẽ đi sâu vào đối chiếu từng nhiều loại message broker vào phần sau về Celery.

Tạo một celery worker cùng với task add

from celery Import Celeryphầm mềm = Celery("name of module", broker="url_of_broker")
ứng dụng.taskdef add(x, y):return x + yChạy worker

$ celery -A tasks worker --loglevel=infogọi task

Lưu kết quả

Celery có thể bảo quản tâm lý của tasks nếu như chúng ta bắt buộc theo dõi tasks sau đây. Với các khối hệ thống tiến hành task theo thủ tục state machine thì câu hỏi hệ thống cần cố kỉnh được luồng tinh thần của task là khôn xiết quan trọng.

Các khối hệ thống celery dùng để làm giữ trạng thái task:

SQLAlchemyMemcachedRedis

Để áp dụng hình thức lưu lại tác dụng trong Celery họ knhị báo celery worker gồm tsi số backkết thúc. Tại phía trên mình thực hiện redis cho cả Việc lưu lại công dụng task lẫn có tác dụng message broker

app = Celery("tasks", backend="redis://localhost", broker="redis://localhost:6379/0")

Cấu hình Celery

Cấu hình mang định cơ bạn dạng của celery:

## Broker settings.broker_url = "redis://localhost:6379/0"# List of modules lớn import when the Celery worker starts.imports = ("myphầm mềm.tasks",)## Using the database to store task state và results.result_backover = "db+sqlite:///results.db"task_annotations = "tasks.add": "rate_limit": "10/s"Best practice: sinh sản một tệp tin config riêng biệt cho celery celeryconfig.py

broker_url = "redis://localhost:6379/0://"result_backkết thúc = "rpc://"task_serializer = "json"result_serializer = "json"accept_nội dung = <"json">timezone = "Europe/Oslo"enable_utc = Truetask_routes = "tasks.add": "low-priority", # routing một task cho tới queue ao ước muốnNgoài phương pháp tạo file config trên ra chúng ta cũng có thể config thẳng bởi application của Celery tiện ích.conf

tiện ích.conf.update(enable_utc=True, timezone="Europe/London",)Tổng kếtCelery không cần thiết phải config các mà chỉ việc import trường đoản cú module sử dụng trực tiếp nhỏng sau

from celery Import Celeryapp = Celery("name of module", broker="url_of_broker")Worker cùng client của Celery hoàn toàn có thể trường đoản cú retry

Một process của Celery hoàn toàn có thể cách xử trí hàng nghìn task vào một phút ít với độ trễ chỉ vài miligiây

Celery hỗ trợ:

Message brokers:RabbitMQRedisSQSXử lý concurrencymultiprocessingmultithreadsingle threadeventlet, geventLưu trữ tác dụng bên trên các hệ thống:AmqpRedisMemcachedSQLAlchemyAmazon S3File systemSerializationjsonyaml

Ở phần sau nội dung bài viết bản thân sẽ đi sâu rộng về worker trong Celery với nhì loại message broker mà Celery hỗ trợ: SQS - Redis, đôi khi dựng một vận dụng cơ bạn dạng sử dụng hệ thống này.