factory = $factory; $this->connection = $connection; $this->table = $table; } /** * Retrieve a list of batches. * * @param int $limit * @param mixed $before * @return \Illuminate\Bus\Batch[] */ public function get($limit = 50, $before = null) { return $this->connection->table($this->table) ->orderByDesc('id') ->take($limit) ->when($before, fn ($q) => $q->where('id', '<', $before)) ->get() ->map(function ($batch) { return $this->toBatch($batch); }) ->all(); } /** * Retrieve information about an existing batch. * * @param string $batchId * @return \Illuminate\Bus\Batch|null */ public function find(string $batchId) { $batch = $this->connection->table($this->table) ->useWritePdo() ->where('id', $batchId) ->first(); if ($batch) { return $this->toBatch($batch); } } /** * Store a new pending batch. * * @param \Illuminate\Bus\PendingBatch $batch * @return \Illuminate\Bus\Batch */ public function store(PendingBatch $batch) { $id = (string) Str::orderedUuid(); $this->connection->table($this->table)->insert([ 'id' => $id, 'name' => $batch->name, 'total_jobs' => 0, 'pending_jobs' => 0, 'failed_jobs' => 0, 'failed_job_ids' => '[]', 'options' => $this->serialize($batch->options), 'created_at' => time(), 'cancelled_at' => null, 'finished_at' => null, ]); return $this->find($id); } /** * Increment the total number of jobs within the batch. * * @param string $batchId * @param int $amount * @return void */ public function incrementTotalJobs(string $batchId, int $amount) { $this->connection->table($this->table)->where('id', $batchId)->update([ 'total_jobs' => new Expression('total_jobs + '.$amount), 'pending_jobs' => new Expression('pending_jobs + '.$amount), 'finished_at' => null, ]); } /** * Decrement the total number of pending jobs for the batch. * * @param string $batchId * @param string $jobId * @return \Illuminate\Bus\UpdatedBatchJobCounts */ public function decrementPendingJobs(string $batchId, string $jobId) { $values = $this->updateAtomicValues($batchId, function ($batch) use ($jobId) { return [ 'pending_jobs' => $batch->pending_jobs - 1, 'failed_jobs' => $batch->failed_jobs, 'failed_job_ids' => json_encode(array_values(array_diff(json_decode($batch->failed_job_ids, true), [$jobId]))), ]; }); return new UpdatedBatchJobCounts( $values['pending_jobs'], $values['failed_jobs'] ); } /** * Increment the total number of failed jobs for the batch. * * @param string $batchId * @param string $jobId * @return \Illuminate\Bus\UpdatedBatchJobCounts */ public function incrementFailedJobs(string $batchId, string $jobId) { $values = $this->updateAtomicValues($batchId, function ($batch) use ($jobId) { return [ 'pending_jobs' => $batch->pending_jobs, 'failed_jobs' => $batch->failed_jobs + 1, 'failed_job_ids' => json_encode(array_values(array_unique(array_merge(json_decode($batch->failed_job_ids, true), [$jobId])))), ]; }); return new UpdatedBatchJobCounts( $values['pending_jobs'], $values['failed_jobs'] ); } /** * Update an atomic value within the batch. * * @param string $batchId * @param \Closure $callback * @return int|null */ protected function updateAtomicValues(string $batchId, Closure $callback) { return $this->connection->transaction(function () use ($batchId, $callback) { $batch = $this->connection->table($this->table)->where('id', $batchId) ->lockForUpdate() ->first(); return is_null($batch) ? [] : tap($callback($batch), function ($values) use ($batchId) { $this->connection->table($this->table)->where('id', $batchId)->update($values); }); }); } /** * Mark the batch that has the given ID as finished. * * @param string $batchId * @return void */ public function markAsFinished(string $batchId) { $this->connection->table($this->table)->where('id', $batchId)->update([ 'finished_at' => time(), ]); } /** * Cancel the batch that has the given ID. * * @param string $batchId * @return void */ public function cancel(string $batchId) { $this->connection->table($this->table)->where('id', $batchId)->update([ 'cancelled_at' => time(), 'finished_at' => time(), ]); } /** * Delete the batch that has the given ID. * * @param string $batchId * @return void */ public function delete(string $batchId) { $this->connection->table($this->table)->where('id', $batchId)->delete(); } /** * Prune all of the entries older than the given date. * * @param \DateTimeInterface $before * @return int */ public function prune(DateTimeInterface $before) { $query = $this->connection->table($this->table) ->whereNotNull('finished_at') ->where('finished_at', '<', $before->getTimestamp()); $totalDeleted = 0; do { $deleted = $query->take(1000)->delete(); $totalDeleted += $deleted; } while ($deleted !== 0); return $totalDeleted; } /** * Prune all of the unfinished entries older than the given date. * * @param \DateTimeInterface $before * @return int */ public function pruneUnfinished(DateTimeInterface $before) { $query = $this->connection->table($this->table) ->whereNull('finished_at') ->where('created_at', '<', $before->getTimestamp()); $totalDeleted = 0; do { $deleted = $query->take(1000)->delete(); $totalDeleted += $deleted; } while ($deleted !== 0); return $totalDeleted; } /** * Prune all of the cancelled entries older than the given date. * * @param \DateTimeInterface $before * @return int */ public function pruneCancelled(DateTimeInterface $before) { $query = $this->connection->table($this->table) ->whereNotNull('cancelled_at') ->where('created_at', '<', $before->getTimestamp()); $totalDeleted = 0; do { $deleted = $query->take(1000)->delete(); $totalDeleted += $deleted; } while ($deleted !== 0); return $totalDeleted; } /** * Execute the given Closure within a storage specific transaction. * * @param \Closure $callback * @return mixed */ public function transaction(Closure $callback) { return $this->connection->transaction(fn () => $callback()); } /** * Serialize the given value. * * @param mixed $value * @return string */ protected function serialize($value) { $serialized = serialize($value); return $this->connection instanceof PostgresConnection ? base64_encode($serialized) : $serialized; } /** * Unserialize the given value. * * @param string $serialized * @return mixed */ protected function unserialize($serialized) { if ($this->connection instanceof PostgresConnection && ! Str::contains($serialized, [':', ';'])) { $serialized = base64_decode($serialized); } return unserialize($serialized); } /** * Convert the given raw batch to a Batch object. * * @param object $batch * @return \Illuminate\Bus\Batch */ protected function toBatch($batch) { return $this->factory->make( $this, $batch->id, $batch->name, (int) $batch->total_jobs, (int) $batch->pending_jobs, (int) $batch->failed_jobs, json_decode($batch->failed_job_ids, true), $this->unserialize($batch->options), CarbonImmutable::createFromTimestamp($batch->created_at), $batch->cancelled_at ? CarbonImmutable::createFromTimestamp($batch->cancelled_at) : $batch->cancelled_at, $batch->finished_at ? CarbonImmutable::createFromTimestamp($batch->finished_at) : $batch->finished_at ); } /** * Get the underlying database connection. * * @return \Illuminate\Database\Connection */ public function getConnection() { return $this->connection; } /** * Set the underlying database connection. * * @param \Illuminate\Database\Connection $connection * @return void */ public function setConnection(Connection $connection) { $this->connection = $connection; } }