Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions lib/private/Files/Cache/BatchPropagator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
<?php
declare(strict_types=1);
/**
* SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

namespace OC\Files\Cache;

use OCP\DB\QueryBuilder\IQueryBuilder;
use OCP\IDBConnection;

class BatchPropagator {
/** @var array<int, array<string, array{hash: string, time: int, size: int}>> */
private array $parents = [];

public function __construct(
private readonly IDBConnection $connection,
) {
}

public function __destruct() {
$this->commit();
}

public function addParent(int $storageId, string $path, int $time, int $sizeDifference): void {
$this->parents[$storageId] ??= [];
if (!isset($this->parents[$storageId][$path])) {
$this->parents[$storageId][$path] = [
'hash' => md5($path),
'time' => $time,
'size' => $sizeDifference,
];
} else {
$this->parents[$storageId][$path]['size'] += $sizeDifference;
$this->parents[$storageId][$path]['time'] = max($this->parents[$storageId][$path]['time'], $time);
}
}

public function commit(): void {
// Ensure rows are always locked in the same order
uksort($this->parents, static fn(string $a, string $b) => $a <=> $b);

Check failure on line 42 in lib/private/Files/Cache/BatchPropagator.php

View workflow job for this annotation

GitHub Actions / static-code-analysis

InvalidScalarArgument

lib/private/Files/Cache/BatchPropagator.php:42:3: InvalidScalarArgument: Type int should be a subtype of string (see https://psalm.dev/012)
foreach ($this->parents as &$paths) {
uasort($paths, static fn(array $a, array $b) => $a['hash'] <=> $b['hash']);
}
unset($paths);

$etag = uniqid();

try {
$this->connection->beginTransaction();

foreach ($this->parents as $storageId => $parents) {
if ($this->connection->getDatabaseProvider() !== IDBConnection::PLATFORM_SQLITE) {
// Lock the rows before updating then with a SELECT FOR UPDATE
// The select also allow us to fetch the fileid and then use these in the UPDATE
// queries as a faster lookup than the path_hash
$hashes = array_map(static fn(array $a): string => $a['hash'], $parents);

foreach (array_chunk($hashes, 1000) as $hashesChunk) {
$query = $this->connection->getQueryBuilder();
$result = $query->select('storage', 'fileid', 'path', 'path_hash', 'size')
->from('filecache')
->orWhere()
->where($query->expr()->eq('storage', $query->createNamedParameter($storageId, IQueryBuilder::PARAM_INT)))
->andWhere($query->expr()->in('path_hash', $query->createNamedParameter($hashesChunk, IQueryBuilder::PARAM_STR_ARRAY)))
->orderBy('path_hash')
->forUpdate()
->executeQuery();

$query = $this->connection->getQueryBuilder();
$query->update('filecache')
->set('mtime', $query->func()->greatest('mtime', $query->createParameter('time')))
->set('etag', $query->expr()->literal($etag))
->where($query->expr()->eq('fileid', $query->createParameter('fileid')));

$queryWithSize = $this->connection->getQueryBuilder();
$queryWithSize->update('filecache')
->set('mtime', $queryWithSize->func()->greatest('mtime', $queryWithSize->createParameter('time')))
->set('etag', $queryWithSize->expr()->literal($etag))
->set('size', $queryWithSize->func()->add('size', $queryWithSize->createParameter('size')))
->where($queryWithSize->expr()->eq('fileid', $queryWithSize->createParameter('fileid')));

while ($row = $result->fetchAssociative()) {
$item = $this->parents[$row['storage']][$row['path']];
if ($row['size'] > -1) {
$queryWithSize->setParameter('fileid', $row['fileid'], IQueryBuilder::PARAM_INT)
->setParameter('size', $item['size'], IQueryBuilder::PARAM_INT)
->setParameter('time', $item['time'], IQueryBuilder::PARAM_INT)
->executeStatement();
} else {
$query->setParameter('fileid', $row['fileid'], IQueryBuilder::PARAM_INT)
->setParameter('time', $item['time'], IQueryBuilder::PARAM_INT)
->executeStatement();
}
}
}
} else {
// No FOR UPDATE support in Sqlite, but instead the whole table is locked
$query = $this->connection->getQueryBuilder();
$query->update('filecache')
->set('mtime', $query->func()->greatest('mtime', $query->createParameter('time')))
->set('etag', $query->expr()->literal($etag))
->where($query->expr()->eq('storage', $query->createNamedParameter($storageId, IQueryBuilder::PARAM_INT)))
->andWhere($query->expr()->eq('path_hash', $query->createParameter('hash')));

$queryWithSize = $this->connection->getQueryBuilder();
$queryWithSize->update('filecache')
->set('mtime', $queryWithSize->func()->greatest('mtime', $queryWithSize->createParameter('time')))
->set('etag', $queryWithSize->expr()->literal($etag))
->set('size', $queryWithSize->func()->add('size', $queryWithSize->createParameter('size')))
->where($queryWithSize->expr()->eq('storage', $queryWithSize->createNamedParameter($storageId, IQueryBuilder::PARAM_INT)))
->andWhere($queryWithSize->expr()->eq('path_hash', $queryWithSize->createParameter('hash')));

foreach ($parents as $item) {
if ($item['size']) {
$queryWithSize->setParameter('hash', $item['hash'], IQueryBuilder::PARAM_STR)
->setParameter('time', $item['time'], IQueryBuilder::PARAM_INT)
->setParameter('size', $item['size'], IQueryBuilder::PARAM_INT)
->executeStatement();
} else {
$query->setParameter('hash', $item['hash'], IQueryBuilder::PARAM_STR)
->setParameter('time', $item['time'], IQueryBuilder::PARAM_INT)
->executeStatement();
}
}
}
}

$this->parents = [];

$this->connection->commit();
} catch (\Exception $e) {
$this->connection->rollback();
throw $e;
}
}
}
4 changes: 2 additions & 2 deletions lib/private/Files/Cache/HomePropagator.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
use OCP\IDBConnection;

class HomePropagator extends Propagator {
public function __construct(IStorage $storage, IDBConnection $connection) {
parent::__construct($storage, $connection, ignore: ['files_encryption']);
public function __construct(IStorage $storage) {
parent::__construct($storage, ignore: ['files_encryption']);
}
}
206 changes: 6 additions & 200 deletions lib/private/Files/Cache/Propagator.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,15 @@
use Psr\Log\LoggerInterface;

class Propagator implements IPropagator {
public const MAX_RETRIES = 3;

private bool $inBatch = false;
private array $batch = [];
private ClockInterface $clock;
private BatchPropagator $batchPropagator;

public function __construct(
protected readonly IStorage $storage,
private readonly IDBConnection $connection,
private readonly array $ignore = [],
) {
$this->clock = Server::get(ClockInterface::class);
$this->batchPropagator = Server::get(BatchPropagator::class);
}

#[Override]
Expand Down Expand Up @@ -66,87 +63,8 @@ public function propagateChange(string $internalPath, int $time, int $sizeDiffer
return;
}

if ($this->inBatch) {
foreach ($parents as $parent) {
$this->addToBatch($parent, $time, $sizeDifference);
}
return;
}

$parentHashes = array_map('md5', $parents);
sort($parentHashes); // Ensure rows are always locked in the same order
$etag = uniqid(); // since we give all folders the same etag we don't ask the storage for the etag

$builder = $this->connection->getQueryBuilder();
$hashParams = array_map(static fn (string $hash): ILiteral => $builder->expr()->literal($hash), $parentHashes);

$builder->update('filecache')
->set('mtime', $builder->func()->greatest('mtime', $builder->createNamedParameter($time, IQueryBuilder::PARAM_INT)))
->where($builder->expr()->eq('storage', $builder->createNamedParameter($storageId, IQueryBuilder::PARAM_INT)))
->andWhere($builder->expr()->in('path_hash', $hashParams));
if (!$this->storage->instanceOfStorage(IReliableEtagStorage::class)) {
$builder->set('etag', $builder->createNamedParameter($etag, IQueryBuilder::PARAM_STR));
}

if ($sizeDifference !== 0) {
$hasCalculatedSize = $builder->expr()->gt('size', $builder->expr()->literal(-1, IQUeryBuilder::PARAM_INT));
$sizeColumn = $builder->getColumnName('size');
$newSize = $builder->func()->greatest(
$builder->func()->add('size', $builder->createNamedParameter($sizeDifference)),
$builder->createNamedParameter(-1, IQueryBuilder::PARAM_INT)
);

// Only update if row had a previously calculated size
$builder->set('size', $builder->createFunction("CASE WHEN $hasCalculatedSize THEN $newSize ELSE $sizeColumn END"));

if ($this->storage->instanceOfStorage(Encryption::class)) {
// in case of encryption being enabled after some files are already uploaded, some entries will have an unencrypted_size of 0 and a non-zero size
$hasUnencryptedSize = $builder->expr()->neq('unencrypted_size', $builder->expr()->literal(0, IQueryBuilder::PARAM_INT));
$sizeColumn = $builder->getColumnName('size');
$unencryptedSizeColumn = $builder->getColumnName('unencrypted_size');
$newUnencryptedSize = $builder->func()->greatest(
$builder->func()->add(
$builder->createFunction("CASE WHEN $hasUnencryptedSize THEN $unencryptedSizeColumn ELSE $sizeColumn END"),
$builder->createNamedParameter($sizeDifference)
),
$builder->createNamedParameter(-1, IQueryBuilder::PARAM_INT)
);

// Only update if row had a previously calculated size
$builder->set('unencrypted_size', $builder->createFunction("CASE WHEN $hasCalculatedSize THEN $newUnencryptedSize ELSE $unencryptedSizeColumn END"));
}
}

for ($i = 0; $i < self::MAX_RETRIES; $i++) {
try {
if ($this->connection->getDatabaseProvider() !== IDBConnection::PLATFORM_SQLITE) {
$this->connection->beginTransaction();
// Lock all the rows first with a SELECT FOR UPDATE ordered by path_hash
$forUpdate = $this->connection->getQueryBuilder();
$forUpdate->select('fileid')
->from('filecache')
->where($forUpdate->expr()->eq('storage', $forUpdate->createNamedParameter($storageId, IQueryBuilder::PARAM_INT)))
->andWhere($forUpdate->expr()->in('path_hash', $hashParams))
->orderBy('path_hash')
->forUpdate()
->executeQuery();
$builder->executeStatement();
$this->connection->commit();
} else {
$builder->executeStatement();
}
break;
} catch (DbalException $e) {
if ($this->connection->getDatabaseProvider() !== IDBConnection::PLATFORM_SQLITE) {
$this->connection->rollBack();
}
if (!$e->isRetryable()) {
throw $e;
}

$loggerInterface = Server::get(LoggerInterface::class);
$loggerInterface->warning('Retrying propagation query after retryable exception.', [ 'exception' => $e ]);
}
foreach ($parents as $parent) {
$this->batchPropagator->addParent($storageId, $parent, $time, $sizeDifference);
}
}

Expand All @@ -165,122 +83,10 @@ protected function getParents(string $path): array {
}

#[Override]
public function beginBatch(): void {
$this->inBatch = true;
}

private function addToBatch(string $internalPath, int $time, int $sizeDifference): void {
if (!isset($this->batch[$internalPath])) {
$this->batch[$internalPath] = [
'hash' => md5($internalPath),
'time' => $time,
'size' => $sizeDifference,
];
} else {
$this->batch[$internalPath]['size'] += $sizeDifference;
if ($time > $this->batch[$internalPath]['time']) {
$this->batch[$internalPath]['time'] = $time;
}
}
}
public function beginBatch(): void {}

#[Override]
public function commitBatch(): void {
if (!$this->inBatch) {
throw new \BadMethodCallException('Not in batch');
}
$this->inBatch = false;

// Ensure rows are always locked in the same order
uasort($this->batch, static fn (array $a, array $b) => $a['hash'] <=> $b['hash']);

try {
$this->connection->beginTransaction();

$storageId = $this->storage->getCache()->getNumericStorageId();

if ($this->connection->getDatabaseProvider() !== IDBConnection::PLATFORM_SQLITE) {
// Lock the rows before updating then with a SELECT FOR UPDATE
// The select also allow us to fetch the fileid and then use these in the UPDATE
// queries as a faster lookup than the path_hash
$hashes = array_map(static fn (array $a): string => $a['hash'], $this->batch);

foreach (array_chunk($hashes, 1000) as $hashesChunk) {
$query = $this->connection->getQueryBuilder();
$result = $query->select('fileid', 'path', 'path_hash', 'size')
->from('filecache')
->where($query->expr()->eq('storage', $query->createNamedParameter($storageId, IQueryBuilder::PARAM_INT)))
->andWhere($query->expr()->in('path_hash', $query->createNamedParameter($hashesChunk, IQueryBuilder::PARAM_STR_ARRAY)))
->orderBy('path_hash')
->forUpdate()
->executeQuery();

$query = $this->connection->getQueryBuilder();
$query->update('filecache')
->set('mtime', $query->func()->greatest('mtime', $query->createParameter('time')))
->set('etag', $query->expr()->literal(uniqid()))
->where($query->expr()->eq('storage', $query->createNamedParameter($storageId, IQueryBuilder::PARAM_INT)))
->andWhere($query->expr()->eq('fileid', $query->createParameter('fileid')));

$queryWithSize = $this->connection->getQueryBuilder();
$queryWithSize->update('filecache')
->set('mtime', $queryWithSize->func()->greatest('mtime', $queryWithSize->createParameter('time')))
->set('etag', $queryWithSize->expr()->literal(uniqid()))
->set('size', $queryWithSize->func()->add('size', $queryWithSize->createParameter('size')))
->where($queryWithSize->expr()->eq('storage', $queryWithSize->createNamedParameter($storageId, IQueryBuilder::PARAM_INT)))
->andWhere($queryWithSize->expr()->eq('fileid', $queryWithSize->createParameter('fileid')));

while ($row = $result->fetchAssociative()) {
$item = $this->batch[$row['path']];
if ($item['size'] && $row['size'] > -1) {
$queryWithSize->setParameter('fileid', $row['fileid'], IQueryBuilder::PARAM_INT)
->setParameter('size', $item['size'], IQueryBuilder::PARAM_INT)
->setParameter('time', $item['time'], IQueryBuilder::PARAM_INT)
->executeStatement();
} else {
$query->setParameter('fileid', $row['fileid'], IQueryBuilder::PARAM_INT)
->setParameter('time', $item['time'], IQueryBuilder::PARAM_INT)
->executeStatement();
}
}
}
} else {
// No FOR UPDATE support in Sqlite, but instead the whole table is locked
$query = $this->connection->getQueryBuilder();
$query->update('filecache')
->set('mtime', $query->func()->greatest('mtime', $query->createParameter('time')))
->set('etag', $query->expr()->literal(uniqid()))
->where($query->expr()->eq('storage', $query->createNamedParameter($storageId, IQueryBuilder::PARAM_INT)))
->andWhere($query->expr()->eq('path_hash', $query->createParameter('hash')));

$queryWithSize = $this->connection->getQueryBuilder();
$queryWithSize->update('filecache')
->set('mtime', $queryWithSize->func()->greatest('mtime', $queryWithSize->createParameter('time')))
->set('etag', $queryWithSize->expr()->literal(uniqid()))
->set('size', $queryWithSize->func()->add('size', $queryWithSize->createParameter('size')))
->where($queryWithSize->expr()->eq('storage', $queryWithSize->createNamedParameter($storageId, IQueryBuilder::PARAM_INT)))
->andWhere($queryWithSize->expr()->eq('path_hash', $queryWithSize->createParameter('hash')));

foreach ($this->batch as $item) {
if ($item['size']) {
$queryWithSize->setParameter('hash', $item['hash'], IQueryBuilder::PARAM_STR)
->setParameter('time', $item['time'], IQueryBuilder::PARAM_INT)
->setParameter('size', $item['size'], IQueryBuilder::PARAM_INT)
->executeStatement();
} else {
$query->setParameter('hash', $item['hash'], IQueryBuilder::PARAM_STR)
->setParameter('time', $item['time'], IQueryBuilder::PARAM_INT)
->executeStatement();
}
}
}

$this->batch = [];

$this->connection->commit();
} catch (\Exception $e) {
$this->connection->rollback();
throw $e;
}
$this->batchPropagator->commit();
}
}
2 changes: 1 addition & 1 deletion lib/private/Files/Storage/Common.php
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ public function getPropagator(?IStorage $storage = null): IPropagator {
/** @var self $storage */
if (!isset($storage->propagator)) {
$config = Server::get(IConfig::class);
$storage->propagator = new Propagator($storage, Server::get(IDBConnection::class), ['appdata_' . $config->getSystemValueString('instanceid')]);
$storage->propagator = new Propagator($storage, ['appdata_' . $config->getSystemValueString('instanceid')]);
}
return $storage->propagator;
}
Expand Down
Loading
Loading