<?php namespace MailPoet\Cron\Workers; if (!defined('ABSPATH')) exit; use MailPoet\Cron\CronHelper; use MailPoet\Cron\CronWorkerScheduler; use MailPoet\Entities\NewsletterEntity; use MailPoet\Entities\NewsletterSegmentEntity; use MailPoet\Entities\ScheduledTaskEntity; use MailPoet\Entities\SegmentEntity; use MailPoet\InvalidStateException; use MailPoet\Logging\LoggerFactory; use MailPoet\Models\Newsletter; use MailPoet\Models\ScheduledTask; use MailPoet\Models\Subscriber; use MailPoet\Models\SubscriberSegment; use MailPoet\Newsletter\NewslettersRepository; use MailPoet\Newsletter\Scheduler\PostNotificationScheduler; use MailPoet\Newsletter\Scheduler\Scheduler as NewsletterScheduler; use MailPoet\Newsletter\Scheduler\WelcomeScheduler; use MailPoet\Newsletter\Segment\NewsletterSegmentRepository; use MailPoet\Newsletter\Sending\ScheduledTasksRepository; use MailPoet\Segments\SegmentsRepository; use MailPoet\Segments\SubscribersFinder; use MailPoet\Tasks\Sending as SendingTask; use MailPoet\Util\Security; use MailPoet\WP\Functions as WPFunctions; use MailPoetVendor\Carbon\Carbon; class Scheduler { const TASK_BATCH_SIZE = 5; /** @var SubscribersFinder */ private $subscribersFinder; /** @var LoggerFactory */ private $loggerFactory; /** @var CronHelper */ private $cronHelper; /** @var CronWorkerScheduler */ private $cronWorkerScheduler; /** @var ScheduledTasksRepository */ private $scheduledTasksRepository; /** @var NewslettersRepository */ private $newslettersRepository; /** @var SegmentsRepository */ private $segmentsRepository; /** @var NewsletterSegmentRepository */ private $newsletterSegmentRepository; /** @var WPFunctions */ private $wp; /** @var Security */ private $security; /** @var NewsletterScheduler */ private $scheduler; public function __construct( SubscribersFinder $subscribersFinder, LoggerFactory $loggerFactory, CronHelper $cronHelper, CronWorkerScheduler $cronWorkerScheduler, ScheduledTasksRepository $scheduledTasksRepository, NewslettersRepository $newslettersRepository, SegmentsRepository $segmentsRepository, NewsletterSegmentRepository $newsletterSegmentRepository, WPFunctions $wp, Security $security, NewsletterScheduler $scheduler ) { $this->cronHelper = $cronHelper; $this->subscribersFinder = $subscribersFinder; $this->loggerFactory = $loggerFactory; $this->cronWorkerScheduler = $cronWorkerScheduler; $this->scheduledTasksRepository = $scheduledTasksRepository; $this->newslettersRepository = $newslettersRepository; $this->segmentsRepository = $segmentsRepository; $this->newsletterSegmentRepository = $newsletterSegmentRepository; $this->wp = $wp; $this->security = $security; $this->scheduler = $scheduler; } public function process($timer = false) { $timer = $timer ?: microtime(true); // abort if execution limit is reached $this->cronHelper->enforceExecutionLimit($timer); $scheduledTasks = $this->getScheduledSendingTasks(); if (!count($scheduledTasks)) return false; // To prevent big changes we convert ScheduledTaskEntity to old model $scheduledQueues = []; foreach ($scheduledTasks as $scheduledTask) { $task = ScheduledTask::findOne($scheduledTask->getId()); if (!$task) continue; $scheduledQueue = SendingTask::createFromScheduledTask($task); if (!$scheduledQueue) continue; $scheduledQueues[] = $scheduledQueue; } $this->updateTasks($scheduledTasks); foreach ($scheduledQueues as $i => $queue) { $newsletter = Newsletter::findOneWithOptions($queue->newsletterId); if (!$newsletter || $newsletter->deletedAt !== null) { $queue->delete(); } elseif ($newsletter->status !== NewsletterEntity::STATUS_ACTIVE && $newsletter->status !== NewsletterEntity::STATUS_SCHEDULED) { continue; } elseif ($newsletter->type === NewsletterEntity::TYPE_WELCOME) { $this->processWelcomeNewsletter($newsletter, $queue); } elseif ($newsletter->type === NewsletterEntity::TYPE_NOTIFICATION) { $this->processPostNotificationNewsletter($newsletter, $queue); } elseif ($newsletter->type === NewsletterEntity::TYPE_STANDARD) { $this->processScheduledStandardNewsletter($newsletter, $queue); } elseif ($newsletter->type === NewsletterEntity::TYPE_AUTOMATIC) { $this->processScheduledAutomaticEmail($newsletter, $queue); } elseif ($newsletter->type === NewsletterEntity::TYPE_RE_ENGAGEMENT) { $this->processReEngagementEmail($queue); } elseif ($newsletter->type === NewsletterEntity::TYPE_AUTOMATION) { $this->processScheduledAutomationEmail($queue); } $this->cronHelper->enforceExecutionLimit($timer); } } public function processWelcomeNewsletter($newsletter, $queue) { $subscribers = $queue->getSubscribers(); if (empty($subscribers[0])) { $queue->delete(); $this->updateScheduledTaskEntity($queue, true); return false; } $subscriberId = (int)$subscribers[0]; if ($newsletter->event === 'segment') { if ($this->verifyMailpoetSubscriber($subscriberId, $newsletter, $queue) === false) { return false; } } else { if ($newsletter->event === 'user') { if ($this->verifyWPSubscriber($subscriberId, $newsletter, $queue) === false) { return false; } } } $queue->status = null; $queue->save(); $this->updateScheduledTaskEntity($queue); return true; } public function processPostNotificationNewsletter($newsletter, SendingTask $queue) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->info( 'process post notification in scheduler', ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId] ); $newsletterEntity = $this->newslettersRepository->findOneById($newsletter->id); if (!$newsletterEntity instanceof NewsletterEntity) { throw new InvalidStateException(); } // ensure that segments exist $segments = $newsletterEntity->getSegmentIds(); if (empty($segments)) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->info( 'post notification no segments', ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId] ); return $this->deleteQueueOrUpdateNextRunDate($queue, $newsletter); } // ensure that subscribers are in segments $taskModel = $queue->task(); $taskEntity = $this->scheduledTasksRepository->findOneById($taskModel->id); if ($taskEntity instanceof ScheduledTaskEntity) { $subscribersCount = $this->subscribersFinder->addSubscribersToTaskFromSegments($taskEntity, $segments); } if (empty($subscribersCount)) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->info( 'post notification no subscribers', ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'segment_ids' => $segments] ); return $this->deleteQueueOrUpdateNextRunDate($queue, $newsletter); } // create a duplicate newsletter that acts as a history record try { $notificationHistory = $this->createPostNotificationHistory($newsletterEntity); } catch (\Exception $exception) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->error( 'creating post notification history failed', ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'error' => $exception->getMessage()] ); return false; } // queue newsletter for delivery $queue->newsletterId = (int)$notificationHistory->getId(); $queue->updateCount(); $queue->status = null; $queue->save(); $this->updateScheduledTaskEntity($queue); // Because there is mixed usage of the old and new model, we want to be sure about the correct state $this->newslettersRepository->refresh($notificationHistory); $queue->getSendingQueueEntity(); // This call refreshes sending queue entity $this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->info( 'post notification set status to sending', ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId] ); return true; } public function processScheduledAutomaticEmail($newsletter, $queue) { if ($newsletter->sendTo === 'segment') { $segment = $this->segmentsRepository->findOneById($newsletter->segment); if ($segment instanceof SegmentEntity) { $taskModel = $queue->task(); $taskEntity = $this->scheduledTasksRepository->findOneById($taskModel->id); if ($taskEntity instanceof ScheduledTaskEntity) { $result = $this->subscribersFinder->addSubscribersToTaskFromSegments($taskEntity, [(int)$segment->getId()]); } if (empty($result)) { $queue->delete(); $this->updateScheduledTaskEntity($queue, true); return false; } } } else { $subscribers = $queue->getSubscribers(); $subscriber = (!empty($subscribers) && is_array($subscribers)) ? Subscriber::findOne($subscribers[0]) : false; if (!$subscriber) { $queue->delete(); $this->updateScheduledTaskEntity($queue, true); return false; } if ($this->verifySubscriber($subscriber, $queue) === false) { return false; } } $queue->status = null; $queue->save(); $this->updateScheduledTaskEntity($queue); return true; } public function processScheduledAutomationEmail($queue): bool { $subscribers = $queue->getSubscribers(); $subscriber = (!empty($subscribers) && is_array($subscribers)) ? Subscriber::findOne($subscribers[0]) : null; if (!$subscriber) { $queue->delete(); $this->updateScheduledTaskEntity($queue, true); return false; } if (!$this->verifySubscriber($subscriber, $queue)) { return false; } $queue->status = null; $queue->save(); $this->updateScheduledTaskEntity($queue); return true; } public function processScheduledStandardNewsletter($newsletter, SendingTask $task) { $newsletterEntity = $this->newslettersRepository->findOneById($newsletter->id); $taskEntity = null; if ($newsletterEntity instanceof NewsletterEntity) { $segments = $newsletterEntity->getSegmentIds(); $taskModel = $task->task(); $taskEntity = $this->scheduledTasksRepository->findOneById($taskModel->id); if ($taskEntity instanceof ScheduledTaskEntity) { $this->subscribersFinder->addSubscribersToTaskFromSegments($taskEntity, $segments); } } // update current queue $task->updateCount(); $task->status = null; $task->save(); // update newsletter status $newsletter->setStatus(Newsletter::STATUS_SENDING); $newsletterEntity && $this->newslettersRepository->refresh($newsletterEntity); $this->updateScheduledTaskEntity($task); return true; } private function processReEngagementEmail($queue) { $queue->status = null; $queue->save(); $this->updateScheduledTaskEntity($queue); return true; } public function verifyMailpoetSubscriber($subscriberId, $newsletter, $queue) { $subscriber = Subscriber::findOne($subscriberId); // check if subscriber is in proper segment $subscriberInSegment = SubscriberSegment::where('subscriber_id', $subscriberId) ->where('segment_id', $newsletter->segment) ->where('status', 'subscribed') ->findOne(); if (!$subscriber || !$subscriberInSegment) { $queue->delete(); return false; } return $this->verifySubscriber($subscriber, $queue); } public function verifyWPSubscriber($subscriberId, $newsletter, $queue) { // check if user has the proper role $subscriber = Subscriber::findOne($subscriberId); if (!$subscriber || $subscriber->isWPUser() === false) { $queue->delete(); return false; } $wpUser = get_userdata($subscriber->wpUserId); if ($wpUser === false) { $queue->delete(); return false; } if ( $newsletter->role !== WelcomeScheduler::WORDPRESS_ALL_ROLES && !in_array($newsletter->role, ((array)$wpUser)['roles']) ) { $queue->delete(); return false; } return $this->verifySubscriber($subscriber, $queue); } public function verifySubscriber($subscriber, $queue) { if ($subscriber->status === Subscriber::STATUS_UNCONFIRMED) { // reschedule delivery $task = $this->scheduledTasksRepository->findOneById($queue->task()->id); if ($task instanceof ScheduledTaskEntity) { $this->cronWorkerScheduler->rescheduleProgressively($task); } return false; } else if ($subscriber->status === Subscriber::STATUS_UNSUBSCRIBED) { $queue->delete(); return false; } return true; } public function deleteQueueOrUpdateNextRunDate($queue, $newsletter) { if ($newsletter->intervalType === PostNotificationScheduler::INTERVAL_IMMEDIATELY) { $queue->delete(); $this->updateScheduledTaskEntity($queue, true); return; } else { $nextRunDate = $this->scheduler->getNextRunDate($newsletter->schedule); if (!$nextRunDate) { $queue->delete(); $this->updateScheduledTaskEntity($queue, true); return; } $queue->scheduledAt = $nextRunDate; $queue->save(); $this->updateScheduledTaskEntity($queue); } } private function updateScheduledTaskEntity(SendingTask $queue, bool $hasBeenDeleted = false) { $taskModel = $queue->task(); if (!$taskModel instanceof ScheduledTask) { return; } $taskEntity = $this->scheduledTasksRepository->findOneById($taskModel->id); if (!$taskEntity instanceof ScheduledTaskEntity) { return; } $hasBeenDeleted ? $this->scheduledTasksRepository->detach($taskEntity) : $this->scheduledTasksRepository->refresh($taskEntity); } public function createPostNotificationHistory(NewsletterEntity $newsletter): NewsletterEntity { // clone newsletter $notificationHistory = clone $newsletter; $notificationHistory->setParent($newsletter); $notificationHistory->setType(NewsletterEntity::TYPE_NOTIFICATION_HISTORY); $notificationHistory->setStatus(NewsletterEntity::STATUS_SENDING); $notificationHistory->setUnsubscribeToken($this->security->generateUnsubscribeTokenByEntity($notificationHistory)); // reset timestamps $createdAt = Carbon::createFromTimestamp($this->wp->currentTime('timestamp')); $notificationHistory->setCreatedAt($createdAt); $notificationHistory->setUpdatedAt($createdAt); $notificationHistory->setDeletedAt(null); // reset hash $notificationHistory->setHash(Security::generateHash()); $this->newslettersRepository->persist($notificationHistory); $this->newslettersRepository->flush(); // create relationships between notification history and segments foreach ($newsletter->getNewsletterSegments() as $newsletterSegment) { $segment = $newsletterSegment->getSegment(); if (!$segment) { continue; } $duplicateSegment = new NewsletterSegmentEntity($notificationHistory, $segment); $notificationHistory->getNewsletterSegments()->add($duplicateSegment); $this->newsletterSegmentRepository->persist($duplicateSegment); } $this->newslettersRepository->flush(); return $notificationHistory; } /** * @param ScheduledTaskEntity[] $scheduledTasks */ private function updateTasks(array $scheduledTasks): void { $ids = array_map(function (ScheduledTaskEntity $scheduledTask): ?int { return $scheduledTask->getId(); }, $scheduledTasks); $ids = array_filter($ids); $this->scheduledTasksRepository->touchAllByIds($ids); } /** * @return ScheduledTaskEntity[] */ public function getScheduledSendingTasks(): array { return $this->scheduledTasksRepository->findScheduledSendingTasks(self::TASK_BATCH_SIZE); } }