Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 117 additions & 4 deletions packages/join-block/src/Services/ActionNetworkService.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@

class ActionNetworkService
{
// Bounds for getPersonTagNames(). Action Network's default page size is 25,
// so 10 pages covers people with up to ~250 taggings. People with more than
// that fall back to the un-optimised path (just call the API and let
// Action Network handle the no-op).
private const MAX_TAGGING_PAGES = 10;
private const HTTP_TIMEOUT_SECONDS = 10;

// Sentinel returned by getPersonTagNames() when enumeration was aborted
// (page cap hit, HTTP failure). Distinct from null (person not found) and
// from [] (person has zero tags).
private const TAG_ENUMERATION_ABORTED = false;

public static function signup($data)
{
global $joinBlockLog;
Expand Down Expand Up @@ -117,6 +129,11 @@ public static function signup($data)
}

public static function personExists($email)
{
return self::getPerson($email) !== null;
}

private static function getPerson($email)
{
global $joinBlockLog;

Expand All @@ -127,7 +144,7 @@ public static function personExists($email)
];

// TODO: remove after REI debugging complete
$joinBlockLog->info("Action Network personExists request query: " . json_encode($query));
$joinBlockLog->info("Action Network getPerson request query: " . json_encode($query));

$response = $client->request(
"GET",
Expand All @@ -141,17 +158,104 @@ public static function personExists($email)
);

$data = json_decode($response->getBody()->getContents(), true);
return !empty($data["_embedded"]["osdi:people"]);
$people = $data["_embedded"]["osdi:people"] ?? [];
return $people[0] ?? null;
}

/**
* Returns the names of tags applied to the person, or null if the person
* does not exist, or self::TAG_ENUMERATION_ABORTED (false) if we hit the
* page cap or an HTTP error. Callers should treat the aborted sentinel as
* "unknown" and fall back to calling the underlying API anyway.
*/
private static function getPersonTagNames($email)
{
global $joinBlockLog;

$person = self::getPerson($email);
if ($person === null) {
return null;
}

$taggingsHref = $person["_links"]["osdi:taggings"]["href"] ?? null;
if (!$taggingsHref) {
return [];
}

$client = new Client();
$requestOptions = [
"headers" => ["OSDI-API-Token" => Settings::get("ACTION_NETWORK_API_KEY")],
"timeout" => self::HTTP_TIMEOUT_SECONDS,
"connect_timeout" => self::HTTP_TIMEOUT_SECONDS,
];

$tagNames = [];
$nextHref = $taggingsHref;
$page = 0;
try {
while ($nextHref) {
if ($page >= self::MAX_TAGGING_PAGES) {
$joinBlockLog->warning(
"Action Network getPersonTagNames($email): aborting enumeration after " .
self::MAX_TAGGING_PAGES . " pages — falling back to un-optimised path"
);
return self::TAG_ENUMERATION_ABORTED;
}
$page++;

$response = $client->request("GET", $nextHref, $requestOptions);
$body = json_decode($response->getBody()->getContents(), true);
$taggings = $body["_embedded"]["osdi:taggings"] ?? [];
foreach ($taggings as $tagging) {
$tagHref = $tagging["_links"]["osdi:tag"]["href"] ?? null;
if (!$tagHref) {
continue;
}
$tagResponse = $client->request("GET", $tagHref, $requestOptions);
$tagBody = json_decode($tagResponse->getBody()->getContents(), true);
if (!empty($tagBody["name"])) {
$tagNames[] = $tagBody["name"];
}
}
$candidateNext = $body["_links"]["next"]["href"] ?? null;
// Defensive: Action Network shouldn't return a cyclic next link,
// but guard against it explicitly rather than spinning.
if ($candidateNext === $nextHref) {
$joinBlockLog->warning(
"Action Network getPersonTagNames($email): next link did not advance — aborting enumeration"
);
return self::TAG_ENUMERATION_ABORTED;
}
$nextHref = $candidateNext;
}
} catch (\Exception $e) {
$joinBlockLog->warning(
"Action Network getPersonTagNames($email): enumeration failed (" . $e->getMessage() .
") — falling back to un-optimised path"
);
return self::TAG_ENUMERATION_ABORTED;
}

return $tagNames;
}

// Callers must hold the per-email JoinService lock — see JoinService::acquireLock().
// The get/check/post sequence is otherwise a TOCTOU race.
public static function addTag($email, $tag)
{
global $joinBlockLog;

if (!self::personExists($email)) {
$tagNames = self::getPersonTagNames($email);
if ($tagNames === null) {
$joinBlockLog->warning("Skipping Action Network addTag('$tag') for $email: person does not exist");
return;
}
if (is_array($tagNames) && in_array($tag, $tagNames, true)) {
$joinBlockLog->info("Skipping Action Network addTag('$tag') for $email: tag already applied");
return;
}
// $tagNames === TAG_ENUMERATION_ABORTED falls through and we apply the
// tag anyway — Action Network treats a repeat add as a no-op.

$client = new Client();

Expand Down Expand Up @@ -182,14 +286,23 @@ public static function addTag($email, $tag)
);
}

// Callers must hold the per-email JoinService lock — see JoinService::acquireLock().
// The get/check/post sequence is otherwise a TOCTOU race.
public static function removeTag($email, $tag)
{
global $joinBlockLog;

if (!self::personExists($email)) {
$tagNames = self::getPersonTagNames($email);
if ($tagNames === null) {
$joinBlockLog->warning("Skipping Action Network removeTag('$tag') for $email: person does not exist");
return;
}
if (is_array($tagNames) && !in_array($tag, $tagNames, true)) {
$joinBlockLog->info("Skipping Action Network removeTag('$tag') for $email: tag not applied");
return;
}
// $tagNames === TAG_ENUMERATION_ABORTED falls through and we send the
// remove anyway — Action Network treats removing an absent tag as a no-op.

$client = new Client();

Expand Down
53 changes: 35 additions & 18 deletions packages/join-block/src/Services/JoinService.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,51 +22,68 @@ private static function formatDob($day, $month, $year)

public static function handleJoin($data)
{
global $joinBlockLog;

$lockFile = null;
try {
$sessionToken = $data['sessionToken'] ?? null;
$lockFile = self::lockSession($sessionToken);
$lockKey = $data['email'] ?? null;
if (!$lockKey) {
$lockKey = $data['sessionToken'] ?? null;
if ($lockKey) {
$joinBlockLog->warning("handleJoin called without email; falling back to sessionToken for lock key");
}
}
$lockFile = self::acquireLock($lockKey);
$chargebeeCustomer = self::tryHandleJoin($data);
do_action('ck_join_flow_success', $data, $chargebeeCustomer);
} catch (\Exception $e) {
do_action('ck_join_flow_error', $data, $e);
throw $e;
} finally {
self::unlockSession($lockFile);
self::releaseLock($lockFile);
}
return $chargebeeCustomer;
}

/**
* This is a BLOCKING lock, which means threads will sleep
* until they can get the lock. This forces sequential execution,
* which means no race conditions.
* Acquire a BLOCKING exclusive lock keyed by an opaque string (typically
* an email address). Threads sleep until they can get the lock, forcing
* sequential execution and avoiding race conditions on per-person CRM
* mutations across the /join endpoint and webhook handlers.
*
* We still need to handle duplicate join requests, by
* e.g. making sure the code doesn't create a subscription
* if one already exists.
*
*
* NOTE: flock() is per-host. If this app is ever deployed across multiple
* PHP-FPM hosts, this lock no longer protects — a distributed lock
* (Redis, DB row lock) would be needed.
*
* @return resource The file handle of the lock file
*/
public static function lockSession($sessionToken)
public static function acquireLock($key)
{
global $joinBlockLog;

if (!$sessionToken) {
throw new \Exception("Unable to lock session: no token provided");
if (!$key) {
throw new \Exception("Unable to acquire lock: no key provided");
}

$joinBlockLog->info("Locking session $sessionToken");
// Normalize so emails with differing case / whitespace collide on the
// same lock, and so the resulting filename is safe for any tmp dir.
$normalizedKey = sha1(strtolower(trim((string) $key)));

$joinBlockLog->info("Locking key $normalizedKey");

// Use WordPress get_temp_dir() as lock directory, this must be writable
// otherwise many WordPress features do not work (e.g. file uploads)
$lockFilepath = get_temp_dir() . '/' . $sessionToken;
$lockFilepath = get_temp_dir() . '/join-lock-' . $normalizedKey;
// Ignore fopen() error, as it is necessary for flock()
// phpcs:ignore WordPress.WP.AlternativeFunctions.file_system_operations_fopen
$lockFile = fopen($lockFilepath, 'w');

if (!$lockFile) {
$joinBlockLog->error("Could not use lockfile for session $sessionToken");
$joinBlockLog->error("Could not use lockfile for key $normalizedKey");
throw new \Exception("Unable to open lock file: " . esc_html($lockFilepath));
}

Expand All @@ -78,11 +95,11 @@ public static function lockSession($sessionToken)
if (!$lockSuccess) {
// phpcs:ignore WordPress.WP.AlternativeFunctions.file_system_operations_fclose
fclose($lockFile);
$joinBlockLog->error("Could not lock session $sessionToken");
throw new \Exception("Unable to lock session: " . esc_html($sessionToken));
$joinBlockLog->error("Could not lock key $normalizedKey");
throw new \Exception("Unable to acquire lock: " . esc_html($normalizedKey));
}

$joinBlockLog->info("Locked session $sessionToken");
$joinBlockLog->info("Locked key $normalizedKey");

// Lock acquired
return $lockFile;
Expand All @@ -91,7 +108,7 @@ public static function lockSession($sessionToken)
/**
* @param resource $lockFile The file handle of the lock file
*/
public static function unlockSession($lockFile)
public static function releaseLock($lockFile)
{
global $joinBlockLog;

Expand All @@ -111,7 +128,7 @@ public static function unlockSession($lockFile)
// See: https://www.man7.org/linux/man-pages/man2/flock.2.html
// phpcs:ignore WordPress.WP.AlternativeFunctions.unlink_unlink
@unlink($fileInfo['uri']);
$joinBlockLog->info("Unlocked session {$fileInfo['uri']}");
$joinBlockLog->info("Unlocked {$fileInfo['uri']}");
}

/**
Expand Down
14 changes: 14 additions & 0 deletions packages/join-block/src/Services/StripeService.php
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,11 @@ public static function handleWebhook($event)
$customerId = null;
$customerLapsed = false;
$lapseTrigger = null;
// Per-email lock acquired lazily when we resolve the customer's
// email. Serialises CRM mutations against the /join endpoint and
// other concurrent webhook deliveries for the same person.
// See JoinService::acquireLock().
$lockFile = null;

try {
switch ($event['type']) {
Expand Down Expand Up @@ -792,6 +797,7 @@ public static function handleWebhook($event)
if (!empty($invoice['customer'])) {
$email = self::getEmailForCustomer($customerId);
if ($email) {
$lockFile = JoinService::acquireLock($email);
$context = ['provider' => 'stripe', 'trigger' => 'invoice_payment_failed_retry_scheduled', 'event' => $event];
if (JoinService::shouldMarkMemberLapsing($email, $context)) {
JoinService::toggleMemberLapsing($email, true, $context);
Expand All @@ -808,6 +814,7 @@ public static function handleWebhook($event)
if (!empty($invoice['customer'])) {
$email = self::getEmailForCustomer($customerId);
if ($email) {
$lockFile = JoinService::acquireLock($email);
$context = ['provider' => 'stripe', 'trigger' => 'invoice_paid', 'event' => $event];
if (JoinService::shouldUnlapseMember($email, $context)) {
JoinService::toggleMemberLapsed($email, false, null, $context);
Expand All @@ -830,6 +837,8 @@ public static function handleWebhook($event)

$joinBlockLog->info("Syncing updated customer details for Stripe customer {$customer['id']} ($email)");

$lockFile = JoinService::acquireLock($email);
Comment on lines 837 to +840

$personData = self::extractPersonDataFromStripeCustomer($customer);
$mergeFields = self::extractMailchimpMergeFieldsFromStripeCustomer($customer);

Expand Down Expand Up @@ -868,6 +877,7 @@ public static function handleWebhook($event)
$email = self::getEmailForCustomer($cid);

if ($email) {
$lockFile = JoinService::acquireLock($email);
$activeStatuses = ['active', 'trialing'];
$lapsedStatuses = ['unpaid', 'incomplete_expired'];
$lapsingStatuses = ['past_due'];
Expand Down Expand Up @@ -906,6 +916,7 @@ public static function handleWebhook($event)
$joinBlockLog->warning("Tier change detected but could not resolve email for customer {$subscription['customer']}");
break;
}
$lockFile = $lockFile ?: JoinService::acquireLock($email);

['previousPriceId' => $previousPriceId, 'currentPriceId' => $currentPriceId] = $priceChange;
$newPlan = Settings::getMembershipPlanByPriceId($currentPriceId);
Expand Down Expand Up @@ -968,6 +979,7 @@ public static function handleWebhook($event)
if ($customerLapsed) {
$email = self::getEmailForCustomer($customerId);
if ($email) {
$lockFile = $lockFile ?: JoinService::acquireLock($email);
$context = ['provider' => 'stripe', 'trigger' => $lapseTrigger, 'event' => $event];
if (JoinService::shouldLapseMember($email, $context)) {
JoinService::toggleMemberLapsed($email, true, null, $context);
Expand All @@ -977,6 +989,8 @@ public static function handleWebhook($event)
} catch (\Exception $e) {
$c = $customerId ?: "(unknown)";
$joinBlockLog->error("Error handling Stripe webhook for customer $c: " . $e->getMessage());
} finally {
JoinService::releaseLock($lockFile);
}
}

Expand Down
Loading
Loading