Skip to content

Commit

Permalink
Fix race condition on concurrent IPC connects
Browse files Browse the repository at this point in the history
We write the key to the child [1] before registering it to be received, which is inside `accept` [2].

If the child connects fast enough, this will result in a race condition. The race condition will only happen if there are multiple concurrent accepts, because we start to accept only after registering the key [3]. With concurrent pending connects, we might already be in the accept loop.

Fixes #199.

[1] https://github.com/amphp/parallel/blob/ffda869c33c30627b6eb5c25f096882d885681dc/src/Context/ProcessContext.php#L159
[2] https://github.com/amphp/parallel/blob/ffda869c33c30627b6eb5c25f096882d885681dc/src/Ipc/SocketIpcHub.php#L172
[3] https://github.com/amphp/parallel/blob/ffda869c33c30627b6eb5c25f096882d885681dc/src/Ipc/SocketIpcHub.php#L168
  • Loading branch information
kelunik committed Mar 19, 2024
1 parent ffda869 commit 9403846
Showing 1 changed file with 40 additions and 39 deletions.
79 changes: 40 additions & 39 deletions src/Ipc/SocketIpcHub.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

namespace Amp\Parallel\Ipc;

use Amp\Cache\LocalCache;
use Amp\Cancellation;
use Amp\DeferredFuture;
use Amp\CancelledException;
use Amp\ForbidCloning;
use Amp\ForbidSerialization;
use Amp\NullCancellation;
use Amp\Socket;
use Amp\Socket\ResourceSocket;
use Amp\Socket\SocketAddressType;
Expand All @@ -20,22 +22,19 @@ final class SocketIpcHub implements IpcHub
public const DEFAULT_KEY_RECEIVE_TIMEOUT = 5;
public const DEFAULT_KEY_LENGTH = 64;

private int $nextId = 0;

/** @var non-empty-string */
private readonly string $uri;

/** @var array<string, int> */
private array $keys = [];

/** @var array<int, DeferredFuture> */
private array $pending = [];
/** @var array<string, EventLoop\Suspension> */
private array $waitingByKey = [];

/** @var \Closure(): void */
private readonly \Closure $accept;

private bool $queued = false;

private LocalCache $clientsByKey;

/**
* @param float $keyReceiveTimeout Timeout to receive the key on accepted connections.
* @param positive-int $keyLength Length of the random key exchanged on the IPC channel when connecting.
Expand All @@ -51,24 +50,26 @@ public function __construct(
SocketAddressType::Internet => 'tcp://' . $address->toString(),
};

$this->clientsByKey = new LocalCache(1024, $keyReceiveTimeout);

$queued = &$this->queued;
$keys = &$this->keys;
$pending = &$this->pending;
$waitingByKey = &$this->waitingByKey;
$clientsByKey = &$this->clientsByKey;
$this->accept = static function () use (
&$queued,
&$keys,
&$pending,
&$waitingByKey,
&$clientsByKey,
$server,
$keyReceiveTimeout,
$keyLength,
): void {
while ($pending) {
while ($waitingByKey) {
$client = $server->accept();
if (!$client) {
$queued = false;
$exception = new Socket\SocketException('IPC socket closed before the client connected');
foreach ($pending as $deferred) {
$deferred->error($exception);
foreach ($waitingByKey as $suspension) {
$suspension->throw($exception);
}
return;
}
Expand All @@ -80,22 +81,12 @@ public function __construct(
continue; // Ignore possible foreign connection attempt.
}

$id = $keys[$received] ?? null;

if ($id === null) {
$client->close();
continue; // Ignore possible foreign connection attempt.
if (isset($waitingByKey[$received])) {
$waitingByKey[$received]->resume($client);
unset($waitingByKey[$received]);
} else {
$clientsByKey->set($received, $client);
}

$deferred = $pending[$id] ?? null;
unset($pending[$id], $keys[$received]);

if ($deferred === null) {
$client->close();
continue; // Client accept cancelled.
}

$deferred->complete($client);
}

$queued = false;
Expand All @@ -116,13 +107,13 @@ public function close(): void
{
$this->server->close();

if (!$this->pending) {
if (!$this->waitingByKey) {
return;
}

$exception = new Socket\SocketException('IPC socket closed before the client connected');
foreach ($this->pending as $deferred) {
$deferred->error($exception);
foreach ($this->waitingByKey as $suspension) {
$suspension->throw($exception);
}
}

Expand Down Expand Up @@ -158,24 +149,34 @@ public function accept(string $key, ?Cancellation $cancellation = null): Resourc
));
}

if (isset($this->keys[$key])) {
if (isset($this->waitingByKey[$key])) {
throw new \Error("An accept is already pending for the given key");
}

$id = $this->nextId++;
$client = $this->clientsByKey->get($key);
if ($client) {
$this->clientsByKey->delete($key);

return $client;
}

if (!$this->queued) {
EventLoop::queue($this->accept);
$this->queued = true;
}

$this->keys[$key] = $id;
$this->pending[$id] = $deferred = new DeferredFuture();
$this->waitingByKey[$key] = $suspension = EventLoop::getSuspension();

$cancellation = $cancellation ?? new NullCancellation();
$cancellationId = $cancellation->subscribe(function (CancelledException $exception) use ($suspension) {
$suspension->throw($exception);
});

try {
$client = $deferred->getFuture()->await($cancellation);
$client = $suspension->suspend();
} finally {
unset($this->pending[$id], $this->keys[$key]);
$cancellation->unsubscribe($cancellationId);
unset($this->waitingByKey[$key]);
}

return $client;
Expand Down

0 comments on commit 9403846

Please sign in to comment.