<?php
declare( strict_types=1 );
namespace Automattic\WooCommerce\ActionSchedulerJobFramework;
use ActionScheduler_Action;
use Automattic\WooCommerce\ActionSchedulerJobFramework\Utilities\BatchSize;
use Exception;
defined( 'ABSPATH' ) || exit;
/**
* Class AbstractChainedJob.
*
* A "chained job" is a kind of batched job that creates follow-up actions until all items in the job have been processed.
*
* Each "batch" in the job is a separate "scheduled action". Each batch is numbered and should be limited to process a
* set number of items.
*
* Only a single chained job can run at any one time.
*
* @since 1.0.0
*/
abstract class AbstractChainedJob extends AbstractJob implements ChainedJobInterface {
use BatchSize;
/**
* Get a set of items for the batch.
*
* NOTE: when using an OFFSET based query to retrieve items it's recommended to order by the item ID while
* ASCENDING. This is so that any newly added items will not disrupt the query offset.
*
* @param int $batch_number The batch number increments for each new batch in the job cycle.
* @param array $args The args for the job.
*
* @throws Exception On error. The failure will be logged by Action Scheduler and the job chain will stop.
*/
abstract protected function get_items_for_batch( int $batch_number, array $args ): array;
/**
* Process a single item.
*
* @param string|int|array $item A single item from the get_items_for_batch() method.
* @param array $args The args for the job.
*
* @throws Exception On error. The failure will be logged by Action Scheduler and the job chain will stop.
*/
abstract protected function process_item( $item, array $args );
/**
* Called before starting the job.
*/
protected function handle_start() {
// Optionally override this method in child class.
}
/**
* Called after the finishing the job.
*/
protected function handle_end() {
// Optionally override this method in child class.
}
/**
* Init the job, register necessary WP actions.
*/
public function init() {
add_action( $this->get_action_full_name( self::CHAIN_START ), [ $this, 'handle_start_action' ] );
add_action( $this->get_action_full_name( self::CHAIN_BATCH ), [ $this, 'handle_batch_action' ], 10, 2 );
add_action( $this->get_action_full_name( self::CHAIN_END ), [ $this, 'handle_end_action' ] );
}
/**
* Queue the job to be started in the background.
*
* @param array $args The args for the job.
*/
public function queue_start( array $args = [] ) {
$this->schedule_immediate_action( self::CHAIN_START, [ $args ] );
}
/**
* Queue a batch to be processed immediately.
*
* @param int $batch_number The batch number for the new batch.
* @param array $args The args for the job.
*/
protected function queue_batch( int $batch_number, array $args ) {
$this->schedule_immediate_action( self::CHAIN_BATCH, [ $batch_number, $args ] );
}
/**
* Queue the job to be ended.
*
* Should be called once all items are processed.
*
* @param array $args The args for the job.
*/
protected function queue_end( array $args = [] ) {
$this->schedule_immediate_action( self::CHAIN_END, [ $args ] );
}
/**
* Handles job start action.
*
* @hooked {plugin_name}/jobs/{job_name}/chain_start
*
* @param array $args The args for the job.
*
* @throws Exception On error. The failure will be logged by Action Scheduler and the job chain will stop.
*/
public function handle_start_action( array $args ) {
// Prevent starting if a job already has scheduled batch actions
$batch_action_name = $this->get_action_full_name( self::CHAIN_BATCH );
if ( $this->action_scheduler->next_scheduled_action( $batch_action_name, null, $this->get_group_name() ) ) {
throw new Exception( 'This job is already running.' );
}
$this->handle_start();
$this->queue_batch( 1, $args );
}
/**
* Handle processing a chain batch.
*
* @hooked {plugin_name}/jobs/{job_name}/chain_batch
*
* @param int $batch_number The batch number for the new batch.
* @param array $args The args for the job.
*
* @throws Exception On error. The failure will be logged by Action Scheduler and the job chain will stop.
*/
public function handle_batch_action( int $batch_number, array $args ) {
$items = $this->get_items_for_batch( $batch_number, $args );
if ( empty( $items ) ) {
// No more items to process so end the job chain
$this->queue_end( $args );
} else {
$this->process_items( $items, $args );
// If there were items, queue another batch.
$this->queue_batch( $batch_number + 1, $args );
}
}
/**
* Processes a batch of items.
*
* @since 1.1.0
*
* @param array $items The items of the current batch.
* @param array $args The args for the job.
*
* @throws Exception On error. The failure will be logged by Action Scheduler and the job chain will stop.
*/
protected function process_items( array $items, array $args ) {
foreach ( $items as $item ) {
$this->process_item( $item, $args );
}
}
/**
* Handles job end action.
*
* @hooked {plugin_name}/jobs/{job_name}/chain_end
*
* @param array $args The args for the job.
*
* @throws Exception On error. The failure will be logged by Action Scheduler.
*/
public function handle_end_action( array $args ) {
$this->handle_end();
}
/**
* Check if this job is running.
*
* Checks if there is any "start" or "batch" actions pending or in-progress for this job.
*
* @return bool
*/
public function is_running(): bool {
$start_action = $this->get_action_full_name( self::CHAIN_START );
$batch_action = $this->get_action_full_name( self::CHAIN_BATCH );
if ( $this->action_scheduler->next_scheduled_action( $start_action, null, $this->get_group_name() ) ) {
return true;
}
if ( $this->action_scheduler->next_scheduled_action( $batch_action, null, $this->get_group_name() ) ) {
return true;
}
return false;
}
/**
* Get the number of items processed by the currently running job.
*
* @return int Returns the number of items processed. Will return zero if the job isn't running.
*/
public function get_number_of_items_processed(): int {
$batch_action_name = $this->get_action_full_name( self::CHAIN_BATCH );
$in_progress_actions = $this->action_scheduler->search(
[
'hook' => $batch_action_name,
'per_page' => 1,
'status' => $this->action_scheduler::STATUS_RUNNING,
]
);
if ( $in_progress_actions ) {
return $this->calculate_items_processed_from_batch_action( current( $in_progress_actions ) );
}
$pending_actions = $this->action_scheduler->search(
[
'hook' => $batch_action_name,
'per_page' => 1,
'status' => $this->action_scheduler::STATUS_PENDING,
]
);
if ( $pending_actions ) {
return $this->calculate_items_processed_from_batch_action( current( $pending_actions ) );
}
return 0;
}
/**
* Calculate the number of items processed by the job based on a given scheduled batch action.
*
* @param ActionScheduler_Action $action The most recent batch action.
*
* @return int
*/
protected function calculate_items_processed_from_batch_action( ActionScheduler_Action $action ): int {
$args = $action->get_args();
// The batch number is the first action arg, take 1 because it's not been fully processed yet
$number_of_batches_processed = $args[0] - 1;
// Use max() to not allow a negative value
return max( 0, $this->get_batch_size() * $number_of_batches_processed );
}
}