Introduction to the usage of queues in laravel framework (with code)

01-28-2024

What this article brings to you is an introduction to the usage of queues in laravel framework (with code attached), which has certain reference value. Friends who need it can refer to it and hope to help you. In the actual project development, we often encounter situations that need lightweight queues, such as sending short messages and emails. These tasks are not enough to use heavyweight message queues such as kafka and RabbitMQ, but they do need asynchronous, retry and concurrency control functions. Generally speaking, we often use redis, Beanstalk and Amazon SQS to realize related functions. laravel provides a unified API for different background queue services. This article will introduce the most widely used Redis queue.

Before explaining laravel's queuing service, let's talk about redis-based queuing service. First of all, redis is designed for caching, but it can be used as a message queue because of its own characteristics.

The data structure of redis queue list Linked List redis is easy to realize as a feature of message queue, such as FIFO (first-in, first-out), and only a list object is needed to fetch data from the beginning and stuff data from the tail.

Related commands: (1) Left in and right out: l push/rpop; (2) Right side in and left side out: rpush/lpop.

This simple message queue is easy to implement.

Zset orderly sets some task scenarios, which do not need to be executed immediately, but need to be delayed; Some tasks are important and need to be tried again when they fail. These functions can't be accomplished by just relying on list. At this time, an orderly collection of redis is needed.

Redis ordered set is similar to Redis set, which is a set that does not contain the same string. The difference between them is that each member of the ordered set is associated with a score, which is used to rank the members of the ordered set from the lowest score to the highest score.

There is no relationship between the ordered set and the delayed task, but you can set the score of the ordered set as the time when the delayed task is started, then poll the ordered set and take out the expired task for processing, thus realizing the function of delaying the task.

For important tasks that need to be retried, before the task is executed, the task will be put into an ordered collection and the longest execution time of the task will be set. If the task is successfully executed, it will be deleted in the ordered collection. If the task is not completed within the specified time, then the ordered set of tasks will be put back in the queue.

Related commands:

(1) ZADD adds one or more members to the ordered set, or updates its score if it already exists.

(2) ZRANGEBYSCORE returns an ordered set of member ranges by score.

(3) ZREMRANGEBYRANK deletes the ordered set of all members within the given index.

Task Scheduling of laravel Queue Service The task scheduling process of the queue service is as follows:

Laravel's queue service is controlled by two processes, one is the producer and the other is the consumer. These two processes manipulate three queues of redis, one of which is List, which is responsible for immediate tasks, and the other is Zset, which is responsible for delayed tasks and pending tasks.

The producer is responsible for pushing the task to redis. If it is an immediate task, it will be pushed to queue:default by default. If it is a delayed task, it will be pushed to queue:default:delayed.

Consumers poll two queues, constantly take out tasks from the queues, first put the tasks in queue:default:reserved, and then execute related tasks. If the task is executed successfully, the task in queue:default:reserved will be deleted, otherwise it will be put back in the queue:default:delayed.

Overall process of laravel queue service Task distribution process:

Task processor operation:

Create task queue setting' redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => 'default', 'retry_after' => 90, ],

Configuring in config/queue.php Generally speaking, the default redis configuration is as above, and connection is the connection name of redis in the database; Queue is the name of the queue in redis. It is worth noting that if you use redis cluster, you need to use the key hash tag, which is {default}; When the task runs for more than retry_after, it will be put back in the queue.

The structure of creating a task class is very simple. Generally speaking, it only contains a handle method that the queue can use to call this task.

If you want tasks to be pushed to the queue instead of being executed synchronously, you need to implement the Illuminate ControlQueue interface.

If you want the task to be pushed to a specific connection, such as redis or sqs, you need to set the conneciton variable.

If you want the task to be pushed to a specific queue, you can set the queue variable.

If you want to delay pushing the task, you need to set the delay variable.

If you want to set the maximum number of retries for a task, you can use the tries variable;

If you want to set the maximum number of seconds that a task can run, you can use the timeout parameter.

If you want to access the queue manually, you can use trait: Illuminate Queue Interactive SwithQueue.

After the task distribution service writes the task class, it can distribute it through the dispatch auxiliary function. The only parameter that needs to be passed to dispatch is an instance of this task class:

class PodcastController extends Controller { public function store(Request $request) { //Create a podcast ... ProcessPodcast::dispatch($podcast); } }

If you want to delay the execution of a task in a queue, you can use the delay method of the task instance.

ProcessPodcast::dispatch($podcast) ->delay(Carbon::now()->addMinutes(10));

By pushing tasks to different queues, you can classify the tasks in the queue and even control how many tasks are assigned to different queues. To specify a queue, call the onQueue method of the task instance:

ProcessPodcast::dispatch($podcast)->onQueue('processing');

If multiple queue connections are used, you can push the task to the specified connection. To specify a connection, you can use the onConnection method when distributing tasks:

ProcessPodcast::dispatch($podcast)->onConnection('redis ');

Copyright Description:No reproduction without permission。

Knowledge sharing community for developers。

Let more developers benefit from it。

Help developers share knowledge through the Internet。

Follow us