File "Migration.php"

Full Path: /home/warrior1/public_html/languages/wp-content/plugins/mailpoet/lib/Cron/Workers/SendingQueue/Migration.php
File size: 9.72 KB
MIME-type: text/x-php
Charset: utf-8

<?php

namespace MailPoet\Cron\Workers\SendingQueue;

if (!defined('ABSPATH')) exit;


use MailPoet\Cron\Workers\SimpleWorker;
use MailPoet\Entities\ScheduledTaskEntity;
use MailPoet\Entities\ScheduledTaskSubscriberEntity;
use MailPoet\Entities\SendingQueueEntity;
use MailPoet\Mailer\MailerLog;
use MailPoet\Newsletter\Sending\ScheduledTaskSubscribersRepository;
use MailPoet\WP\Functions as WPFunctions;
use MailPoetVendor\Carbon\Carbon;
use MailPoetVendor\Doctrine\ORM\EntityManager;

class Migration extends SimpleWorker {
  const TASK_TYPE = 'migration';
  const BATCH_SIZE = 20;

  /** @var EntityManager */
  private $entityManager;

  /** @var ScheduledTaskSubscribersRepository */
  private $scheduledTaskSubscribersRepository;

  public function __construct(
    EntityManager $entityManager,
    ScheduledTaskSubscribersRepository $scheduledTaskSubscribersRepository
  ) {
    parent::__construct();
    $this->entityManager = $entityManager;
    $this->scheduledTaskSubscribersRepository = $scheduledTaskSubscribersRepository;
  }

  public function checkProcessingRequirements() {
    // if migration was completed, don't run it again
    $completedTasks = $this->getCompletedTasks();
    return empty($completedTasks);
  }

  public function prepareTaskStrategy(ScheduledTaskEntity $task, $timer) {
    $unmigratedColumns = $this->checkUnmigratedColumnsExist();
    $unmigratedQueuesCount = 0;
    $unmigratedQueueSubscribers = [];

    if ($unmigratedColumns) {
      $unmigratedQueuesCount = count($this->getUnmigratedQueueIds());
      $unmigratedQueueSubscribers = $this->getTaskIdsForUnmigratedSubscribers();
    }

    if (
      !$unmigratedColumns ||
      ($unmigratedQueuesCount == 0
      && count($unmigratedQueueSubscribers) == 0)
    ) {
      // nothing to migrate, complete task
      $task->setProcessedAt(Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp')));
      $task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED);
      $this->scheduledTasksRepository->persist($task);
      $this->scheduledTasksRepository->flush();
      $this->resumeSending();
      return false;
    }

    // pause sending while the migration is in process
    $this->pauseSending();
    return true;
  }

  public function pauseSending() {
    $mailerLog = MailerLog::getMailerLog();
    if (MailerLog::isSendingPaused($mailerLog)) {
      // sending is already paused
      return false;
    }
    $mailerLog = MailerLog::setError(
      $mailerLog,
      'migration',
      __('Your sending queue data is being migrated to allow better performance, sending is paused while the migration is in progress and will resume automatically upon completion. This may take a few minutes.', 'mailpoet')
    );
    return MailerLog::pauseSending($mailerLog);
  }

  public function resumeSending() {
    $mailerLog = MailerLog::getMailerLog();
    if (!MailerLog::isSendingPaused($mailerLog)) {
      // sending is not paused
      return false;
    }
    $error = MailerLog::getError($mailerLog);
    // only resume sending if it was paused by migration
    if (isset($error['operation']) && $error['operation'] === 'migration') {
      return MailerLog::resumeSending();
    }
  }

  public function processTaskStrategy(ScheduledTaskEntity $task, $timer) {
    $this->migrateSendingQueues($timer);
    $this->migrateSubscribers($timer);
    $this->resumeSending();
    return true;
  }

  private function checkUnmigratedColumnsExist() {
    global $wpdb;
    $existingColumns = $wpdb->get_col('DESC ' . esc_sql($this->getTableName(SendingQueueEntity::class)));
    return in_array('type', $existingColumns);
  }

  /**
   * @return array
   */
  public function getUnmigratedQueueIds(): array {
    $sendingQueuesTable = $this->getTableName(SendingQueueEntity::class);
    return $this->entityManager->getConnection()->executeQuery("
      SELECT id
      FROM {$sendingQueuesTable}
      WHERE task_id = 0
        AND type IS NULL
    ")->fetchFirstColumn();
  }

  public function getTaskIdsForUnmigratedSubscribers() {
    global $wpdb;
    $query = sprintf(
      'SELECT queues.`task_id` FROM %1$s queues INNER JOIN %2$s tasks ON queues.`task_id` = tasks.`id` ' .
      'WHERE tasks.`type` = "sending" AND (tasks.`status` IS NULL OR tasks.`status` = "paused") ' .
      'AND queues.`subscribers` != "" AND queues.`subscribers` != "N;"' .
      'AND queues.`count_total` > (SELECT COUNT(*) FROM %3$s subs WHERE subs.`task_id` = queues.`task_id`)',
      esc_sql($this->getTableName(SendingQueueEntity::class)),
      esc_sql($this->getTableName(ScheduledTaskEntity::class)),
      esc_sql($this->getTableName(ScheduledTaskSubscriberEntity::class))
    );
    return $wpdb->get_col($query);
  }

  /*
   * Migrate all sending queues without converting subscriber data
   */
  public function migrateSendingQueues($timer) {
    global $wpdb;

    $queueIds = $this->getUnmigratedQueueIds();

    $columnList = [
      'status',
      'priority',
      'scheduled_at',
      'processed_at',
      'created_at',
      'updated_at',
      'deleted_at',
    ];

    if (!empty($queueIds)) {
      foreach (array_chunk($queueIds, self::BATCH_SIZE) as $queueBatch) {
        // abort if execution limit is reached
        $this->cronHelper->enforceExecutionLimit($timer);

        foreach ($queueBatch as $queueId) {
          // create a new scheduled task of type "sending"

          // Constants are safe, queue ID is cast to int.
          // phpcs:ignore WordPressDotOrg.sniffs.DirectDB.UnescapedDBParameter
          $wpdb->query(sprintf(
            'INSERT IGNORE INTO %1$s (`type`, %2$s) ' .
            'SELECT "sending", %2$s FROM %3$s WHERE `id` = %4$s',
            $this->getTableName(ScheduledTaskEntity::class),
            '`' . join('`, `', $columnList) . '`',
            $this->getTableName(SendingQueueEntity::class),
            $queueId
          ));

          // link the queue with the task via task_id
          $newTaskId = $wpdb->insert_id; // phpcs:ignore Squiz.NamingConventions.ValidVariableName.MemberNotCamelCaps
          $table = esc_sql($this->getTableName(SendingQueueEntity::class));
          $query = $wpdb->prepare(
            "UPDATE `$table` SET `task_id` = %s WHERE `id` = %s",
            $newTaskId,
            $queueId
          );
          $wpdb->query($query);
        }
      }
    }

    return true;
  }

  /*
   * Migrate subscribers for in-progress sending tasks from the `subscribers` field to a separate table
   */
  public function migrateSubscribers($timer) {
    global $wpdb;

    // find in-progress queues that have serialized subscribers
    $taskIds = $this->getTaskIdsForUnmigratedSubscribers();

    // check if subscribers for each one were already migrated
    if (!empty($taskIds)) {
      $taskIds = $wpdb->get_col(sprintf(
        'SELECT queues.`task_id` FROM %1$s queues WHERE queues.`task_id` IN(' . join(',', array_map('intval', $taskIds)) . ') ' .
        'AND queues.`count_total` > (SELECT COUNT(*) FROM %2$s subs WHERE subs.`task_id` = queues.`task_id`)',
        esc_sql($this->getTableName(SendingQueueEntity::class)),
        esc_sql($this->getTableName(ScheduledTaskSubscriberEntity::class))
      ));
    }

    if (!empty($taskIds)) {
      foreach ($taskIds as $taskId) {
        // abort if execution limit is reached
        $this->cronHelper->enforceExecutionLimit($timer);

        $this->migrateTaskSubscribers($taskId, $timer);
      }
    }

    return true;
  }

  public function migrateTaskSubscribers($taskId, $timer) {
    global $wpdb;

    $migratedUnprocessedCount = $this->scheduledTaskSubscribersRepository->countBy([
      'task' => $taskId,
      'processed' => ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED,
    ]);
    $migratedProcessedCount = $this->scheduledTaskSubscribersRepository->countBy([
      'task' => $taskId,
      'processed' => ScheduledTaskSubscriberEntity::STATUS_PROCESSED,
    ]);

    $table = $this->getTableName(SendingQueueEntity::class);
    // All parameters are safe
    // phpcs:ignore WordPressDotOrg.sniffs.DirectDB.UnescapedDBParameter
    $subscribers = $wpdb->get_var($wpdb->prepare(
      "SELECT `subscribers` FROM `$table` WHERE `task_id` = %d AND (`count_processed` > %d OR `count_to_process` > %d)",
      (int)$taskId,
      $migratedUnprocessedCount,
      $migratedProcessedCount
    ));

    // sanity check
    if (empty($subscribers)) {
      return false;
    }

    $subscribers = unserialize($subscribers);

    if (!empty($subscribers['to_process'])) {
      $subscribersToMigrate = array_slice($subscribers['to_process'], $migratedUnprocessedCount);
      foreach ($subscribersToMigrate as $subId) {
        // abort if execution limit is reached
        $this->cronHelper->enforceExecutionLimit($timer);

        $this->scheduledTaskSubscribersRepository->createOrUpdate([
          'task_id' => $taskId,
          'subscriber_id' => $subId,
          'processed' => ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED,
        ]);
      }
    }

    if (!empty($subscribers['processed'])) {
      $subscribersToMigrate = array_slice($subscribers['processed'], $migratedProcessedCount);
      foreach ($subscribersToMigrate as $subId) {
        // abort if execution limit is reached
        $this->cronHelper->enforceExecutionLimit($timer);

        $this->scheduledTaskSubscribersRepository->createOrUpdate([
          'task_id' => $taskId,
          'subscriber_id' => $subId,
          'processed' => ScheduledTaskSubscriberEntity::STATUS_PROCESSED,
        ]);
      }
    }

    return true;
  }

  public function getNextRunDate($wp = null) {
    if (is_null($wp)) {
      $wp = new WPFunctions();
    }
    // run migration immediately
    return Carbon::createFromTimestamp($wp->currentTime('timestamp'));
  }

  private function getTableName(string $entityName): string {
    return $this->entityManager->getClassMetadata($entityName)->getTableName();
  }
}