Source: policies/load-balancing.js

"use strict";

const types = require("../types");
const utils = require("../utils.js");
const errors = require("../errors.js");
const { throwNotSupported } = require("../new-utils.js");

const newlyUpInterval = 60000;

/** @module policies/loadBalancing */
/**
 * Base class for Load Balancing Policies
 */
class LoadBalancingPolicy {
    constructor() {}
    /**
     * Initializes the load balancing policy, called after the driver obtained the information of the cluster.
     * @param {Client} client
     * @param {HostMap} hosts
     * @param {Function} callback
     */
    init(client, hosts, callback) {
        this.client = client;
        this.hosts = hosts;
        callback();
    }
    /**
     * Returns the distance assigned by this policy to the provided host.
     * @param {Host} host
     */
    getDistance(host) {
        return types.distance.local;
    }
    /**
     * Returns an iterator with the hosts for a new query.
     * Each new query will call this method. The first host in the result will
     * then be used to perform the query.
     * @param {String} keyspace Name of currently logged keyspace at `Client` level.
     * @param {ExecutionOptions|null} executionOptions The information related to the execution of the request.
     * @param {Function} callback The function to be invoked with the error as first parameter and the host iterator as
     * second parameter.
     */
    newQueryPlan(keyspace, executionOptions, callback) {
        callback(
            new Error(
                "You must implement a query plan for the LoadBalancingPolicy class",
            ),
        );
    }
    /**
     * Gets an associative array containing the policy options.
     */
    getOptions() {
        return new Map();
    }

    /**
     * @returns {LoadBalancingConfig}
     * @package
     */
    getRustConfiguration() {
        // This error will be thrown by all policies, that do not override this method.
        throw new Error(
            "Currently this load balancing policy is not supported by the driver",
        );
    }
}

class LoadBalancingRustImplemented extends LoadBalancingPolicy {
    constructor() {
        super();
        this.errorMsg =
            "This load balancing policy is implemented in Rust. " +
            "Using this policy from JavaScript, or inheriting from this class " +
            "in order to create custom policies is not supported.";
    }
    init(client, hosts, callback) {
        throwNotSupported(this.errorMsg);
    }
    getDistance(host) {
        throwNotSupported(this.errorMsg);
    }
    newQueryPlan(keyspace, executionOptions, callback) {
        throwNotSupported(this.errorMsg);
    }
}

/**
 * This policy yield nodes in a round-robin fashion.
 * @extends LoadBalancingPolicy
 */
class RoundRobinPolicy extends LoadBalancingRustImplemented {
    constructor() {
        super();
        this.index = 0;
    }
    /**
     * @returns {LoadBalancingConfig}
     */
    getRustConfiguration() {
        return {
            tokenAware: false,
        };
    }
}

/**
 * A data-center aware Round-robin load balancing policy.
 * This policy provides round-robin queries over the nodes of the local
 * data center.
 * @extends {LoadBalancingPolicy}
 */
class DCAwareRoundRobinPolicy extends LoadBalancingRustImplemented {
    /**
     * @param {?String} [localDc] local datacenter name.  This value overrides the 'localDataCenter' Client option \
     * and is useful for cases where you have multiple execution profiles that you intend on using for routing
     * requests to different data centers.
     */
    constructor(localDc) {
        super();
        this.localDc = localDc;
    }
    /**
     * Gets an associative array containing the policy options.
     */
    getOptions() {
        return new Map([["localDataCenter", this.localDc]]);
    }

    /**
     * @returns {LoadBalancingConfig}
     */
    getRustConfiguration() {
        return {
            preferDatacenter: this.localDc,
            permitDcFailover: false,
            tokenAware: false,
        };
    }
}

/**
 * A wrapper load balancing policy that add token awareness to a child policy.
 * @extends LoadBalancingPolicy
 */
class TokenAwarePolicy extends LoadBalancingRustImplemented {
    /**
     * @param {LoadBalancingPolicy} childPolicy
     */
    constructor(childPolicy) {
        super();
        if (!childPolicy) {
            throw new Error("You must specify a child load balancing policy");
        }
        this.childPolicy = childPolicy;
    }
    /**
     * Gets an associative array containing the policy options.
     */
    getOptions() {
        const map = new Map([
            [
                "childPolicy",
                this.childPolicy.constructor !== undefined
                    ? this.childPolicy.constructor.name
                    : null,
            ],
        ]);

        if (this.childPolicy instanceof DCAwareRoundRobinPolicy) {
            map.set("localDataCenter", this.childPolicy.localDc);
        }

        return map;
    }

    /**
     * @returns {LoadBalancingConfig}
     */
    getRustConfiguration() {
        let options = this.childPolicy.getRustConfiguration();
        options.tokenAware = true;
        return options;
    }
}

/**
 * A load balancing policy wrapper that ensure that only hosts from a provided
 * allow list will ever be returned.
 *
 * This policy wraps another load balancing policy and will delegate the choice
 * of hosts to the wrapped policy with the exception that only hosts contained
 * in the allow list provided when constructing this policy will ever be
 * returned. Any host not in the while list will be considered ignored
 * and thus will not be connected to.
 *
 * This policy can be useful to ensure that the driver only connects to a
 * predefined set of hosts. Keep in mind however that this policy defeats
 * somewhat the host auto-detection of the driver. As such, this policy is only
 * useful in a few special cases or for testing, but is not optimal in general.
 * If all you want to do is limiting connections to hosts of the local
 * data-center then you should use DCAwareRoundRobinPolicy and *not* this policy
 * in particular.
 *
 * @extends LoadBalancingPolicy
 */
class AllowListPolicy extends LoadBalancingRustImplemented {
    /**
     * Create a new policy that wraps the provided child policy but only "allow" hosts
     * from the provided list.
     * @param {LoadBalancingPolicy} childPolicy the wrapped policy.
     * If the child policy filters some of the hosts out, only hosts present
     * in allow list and accepted by child policy will be contacted by the driver.
     * @param {Array.<string>}  allowList The hosts address in the format ipAddress:port.
     * Only hosts from this list may get connected
     * to (whether they will get connected to or not depends on the child policy).
     */
    constructor(childPolicy, allowList) {
        super();
        if (!childPolicy) {
            throw new Error("You must specify a child load balancing policy");
        }
        if (!Array.isArray(allowList)) {
            throw new Error(
                "You must provide the list of allowed host addresses",
            );
        }

        this.childPolicy = childPolicy;
        this.allowList = allowList;
    }

    /**
     * Gets an associative array containing the policy options.
     */
    getOptions() {
        return new Map([
            [
                "childPolicy",
                this.childPolicy.constructor !== undefined
                    ? this.childPolicy.constructor.name
                    : null,
            ],
            ["allowList", this.allowList],
        ]);
    }

    /**
     * @returns {LoadBalancingConfig}
     */
    getRustConfiguration() {
        let options = this.childPolicy.getRustConfiguration();
        if (options.allowList) {
            // In case some other policy provided allow list, we take an intersection
            // to mimic the original behavior of this policy
            options.allowList = this.allowList.filter(function (n) {
                return options.allowList.indexOf(n) !== -1;
            });
        } else {
            options.allowList = this.allowList;
        }
        return options;
    }
}

/**
 * A load-balancing policy implementation that attempts to fairly distribute the load based on the amount of in-flight
 * request per hosts. The local replicas are initially shuffled and
 * <a href="https://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf">between the first two nodes in the
 * shuffled list, the one with fewer in-flight requests is selected as coordinator</a>.
 *
 *
 * Additionally, it detects unresponsive replicas and reorders them at the back of the query plan.
 *
 * For graph analytics queries, it uses the preferred analytics graph server previously obtained by driver as first
 * host in the query plan.
 */
class LegacyDefaultLoadBalancingPolicy extends LoadBalancingPolicy {
    /**
     * Creates a new instance of `LegacyDefaultLoadBalancingPolicy`.
     * @param {String|Object} [options] The local data center name or the optional policy options object.
     *
     * Note that when providing the local data center name, it overrides `localDataCenter` option at
     * `Client` level.
     * @param {String} [options.localDc] local data center name.  This value overrides the 'localDataCenter' Client option
     * and is useful for cases where you have multiple execution profiles that you intend on using for routing
     * requests to different data centers.
     * @param {Function} [options.filter] A function to apply to determine if hosts are included in the query plan.
     * The function takes a Host parameter and returns a Boolean.
     */
    constructor(options) {
        super();

        if (typeof options === "string") {
            options = { localDc: options };
        } else if (!options) {
            options = utils.emptyObject;
        }

        this._client = null;
        this._hosts = null;
        this._filteredHosts = null;
        this._preferredHost = null;
        this._index = 0;
        this.localDc = options.localDc;
        this._filter = options.filter || this._defaultFilter;

        // Allow some checks to be injected
        if (options.isHostNewlyUp) {
            this._isHostNewlyUp = options.isHostNewlyUp;
        }
        if (options.healthCheck) {
            this._healthCheck = options.healthCheck;
        }
        if (options.compare) {
            this._compare = options.compare;
        }
        if (options.getReplicas) {
            this._getReplicas = options.getReplicas;
        }
    }

    /**
     * Initializes the load balancing policy, called after the driver obtained the information of the cluster.
     * @param {Client} client
     * @param {HostMap} hosts
     * @param {Function} callback
     */
    init(client, hosts, callback) {
        this._client = client;
        this._hosts = hosts;

        // Clean local host cache
        this._hosts.on("add", () => (this._filteredHosts = null));
        this._hosts.on("remove", () => (this._filteredHosts = null));

        try {
            setLocalDc(this, client, this._hosts);
        } catch (err) {
            return callback(err);
        }

        callback();
    }

    /**
     * Returns the distance assigned by this policy to the provided host, relatively to the client instance.
     * @param {Host} host
     */
    getDistance(host) {
        if (this._preferredHost !== null && host === this._preferredHost) {
            // Set the last preferred host as local.
            // It ensures that the pool for the graph analytics host has the appropriate size
            return types.distance.local;
        }

        if (!this._filter(host)) {
            return types.distance.ignored;
        }

        return host.datacenter === this.localDc
            ? types.distance.local
            : types.distance.ignored;
    }

    /**
     * Returns a host iterator to be used for a query execution.
     * @override
     * @param {String} keyspace
     * @param {ExecutionOptions} executionOptions
     * @param {Function} callback
     */
    newQueryPlan(keyspace, executionOptions, callback) {
        let routingKey;
        let preferredHost;

        if (executionOptions) {
            routingKey = executionOptions.getRoutingKey();

            if (executionOptions.getKeyspace()) {
                keyspace = executionOptions.getKeyspace();
            }

            preferredHost = executionOptions.getPreferredHost();
        }

        let iterable;

        if (!keyspace || !routingKey) {
            iterable = this._getLocalHosts();
        } else {
            iterable = this._getReplicasAndLocalHosts(keyspace, routingKey);
        }

        if (preferredHost) {
            // Set it on an instance level field to set the distance
            this._preferredHost = preferredHost;
            iterable = LegacyDefaultLoadBalancingPolicy._getPreferredHostFirst(
                preferredHost,
                iterable,
            );
        }

        return callback(null, iterable);
    }

    /**
     * Yields the preferred host first, followed by the host in the provided iterable
     * @param preferredHost
     * @param iterable
     * @private
     */
    static *_getPreferredHostFirst(preferredHost, iterable) {
        yield preferredHost;

        for (const host of iterable) {
            if (host !== preferredHost) {
                yield host;
            }
        }
    }

    /**
     * Yields the local hosts without the replicas already yielded
     * @param {Array<Host>} [localReplicas] The local replicas that we should avoid to include again
     * @private
     */
    *_getLocalHosts(localReplicas) {
        // Use a local reference
        const hosts = this._getFilteredLocalHosts();
        const initialIndex = this._getIndex();

        // indexOf() over an Array is a O(n) operation but given that there should be 3 to 7 replicas,
        // it shouldn't be an expensive call. Additionally, this will only be executed when the local replicas
        // have been exhausted in a lazy manner.
        const canBeYield = localReplicas
            ? (h) => localReplicas.indexOf(h) === -1
            : (h) => true;

        for (let i = 0; i < hosts.length; i++) {
            const h = hosts[(i + initialIndex) % hosts.length];
            if (canBeYield(h) && h.isUp()) {
                yield h;
            }
        }
    }

    _getReplicasAndLocalHosts(keyspace, routingKey) {
        let replicas = this._getReplicas(keyspace, routingKey);
        if (replicas === null) {
            return this._getLocalHosts();
        }

        const filteredReplicas = [];
        let newlyUpReplica = null;
        let newlyUpReplicaTimestamp = Number.MIN_SAFE_INTEGER;
        let unhealthyReplicas = 0;

        // Filter by DC, predicate and UP replicas
        // Use the same iteration to perform other checks: whether if its newly UP or unhealthy
        // As this is part of the hot path, we use a simple loop and avoid using Array.prototype.filter() + closure
        for (let i = 0; i < replicas.length; i++) {
            const h = replicas[i];
            if (
                !this._filter(h) ||
                h.datacenter !== this.localDc ||
                !h.isUp()
            ) {
                continue;
            }
            const isUpSince = this._isHostNewlyUp(h);
            if (isUpSince !== null && isUpSince > newlyUpReplicaTimestamp) {
                newlyUpReplica = h;
                newlyUpReplicaTimestamp = isUpSince;
            }
            if (newlyUpReplica === null && !this._healthCheck(h)) {
                unhealthyReplicas++;
            }
            filteredReplicas.push(h);
        }

        replicas = filteredReplicas;

        // Shuffle remaining local replicas
        utils.shuffleArray(replicas);

        if (replicas.length < 3) {
            // Avoid reordering replicas of a set of 2 as we could be doing more harm than good
            return this.yieldReplicasFirst(replicas);
        }

        let temp;

        if (newlyUpReplica === null) {
            if (
                unhealthyReplicas > 0 &&
                unhealthyReplicas < Math.floor(replicas.length / 2 + 1)
            ) {
                // There is one or more unhealthy replicas and there is a majority of healthy replicas
                this._sendUnhealthyToTheBack(replicas, unhealthyReplicas);
            }
        } else if (
            (newlyUpReplica === replicas[0] ||
                newlyUpReplica === replicas[1]) &&
            Math.random() * 4 >= 1
        ) {
            // There is a newly UP replica and the replica in first or second position is the most recent replica
            // marked as UP and dice roll 1d4!=1 -> Send it to the back of the Array
            const index = newlyUpReplica === replicas[0] ? 0 : 1;
            temp = replicas[replicas.length - 1];
            replicas[replicas.length - 1] = replicas[index];
            replicas[index] = temp;
        }

        if (this._compare(replicas[1], replicas[0]) > 0) {
            // Power of two random choices
            temp = replicas[0];
            replicas[0] = replicas[1];
            replicas[1] = temp;
        }

        return this.yieldReplicasFirst(replicas);
    }

    /**
     * Yields the local replicas followed by the rest of local nodes.
     * @param {Array<Host>} replicas The local replicas
     */
    *yieldReplicasFirst(replicas) {
        for (let i = 0; i < replicas.length; i++) {
            yield replicas[i];
        }
        yield* this._getLocalHosts(replicas);
    }

    _isHostNewlyUp(h) {
        return h.isUpSince !== null &&
            Date.now() - h.isUpSince < newlyUpInterval
            ? h.isUpSince
            : null;
    }

    /**
     * Returns a boolean determining whether the host health is ok or not.
     * A Host is considered unhealthy when there are enough items in the queue (10 items in-flight) but the
     * Host is not responding to those requests.
     * @param {Host} h
     * @return {boolean}
     * @private
     */
    _healthCheck(h) {
        return !(h.getInFlight() >= 10 && h.getResponseCount() <= 1);
    }

    /**
     * Compares to host and returns 1 if it needs to favor the first host otherwise, -1.
     * @return {number}
     * @private
     */
    _compare(h1, h2) {
        return h1.getInFlight() < h2.getInFlight() ? 1 : -1;
    }

    _getReplicas(keyspace, routingKey) {
        return this._client.getReplicas(keyspace, routingKey);
    }

    /**
     * Returns an Array of hosts filtered by DC and predicate.
     * @returns {Array<Host>}
     * @private
     */
    _getFilteredLocalHosts() {
        if (this._filteredHosts === null) {
            this._filteredHosts = this._hosts
                .values()
                .filter(
                    (h) => this._filter(h) && h.datacenter === this.localDc,
                );
        }
        return this._filteredHosts;
    }

    _getIndex() {
        const result = this._index++;
        // Overflow protection
        if (this._index === 0x7fffffff) {
            this._index = 0;
        }
        return result;
    }

    _sendUnhealthyToTheBack(replicas, unhealthyReplicas) {
        let counter = 0;

        // Start from the back, move backwards and stop once all unhealthy replicas are at the back
        for (
            let i = replicas.length - 1;
            i >= 0 && counter < unhealthyReplicas;
            i--
        ) {
            const host = replicas[i];
            if (this._healthCheck(host)) {
                continue;
            }

            const targetIndex = replicas.length - 1 - counter;
            if (targetIndex !== i) {
                const temp = replicas[targetIndex];
                replicas[targetIndex] = host;
                replicas[i] = temp;
            }
            counter++;
        }
    }

    _defaultFilter() {
        return true;
    }

    /**
     * Gets an associative array containing the policy options.
     */
    getOptions() {
        return new Map([
            ["localDataCenter", this.localDc],
            ["filterFunction", this._filter !== this._defaultFilter],
        ]);
    }
}

/**
 * Validates and sets the local data center to be used.
 * @param {LoadBalancingPolicy} lbp
 * @param {Client} client
 * @param {HostMap} hosts
 * @private
 */
function setLocalDc(lbp, client, hosts) {
    if (!(lbp instanceof LoadBalancingPolicy)) {
        throw new errors.DriverInternalError(
            "LoadBalancingPolicy instance was not provided",
        );
    }

    if (client && client.options) {
        if (lbp.localDc && !client.options.localDataCenter) {
            client.log(
                "info",
                `Local data center '${lbp.localDc}' was provided as an argument to the load-balancing` +
                    ` policy. It is preferable to specify the local data center using 'localDataCenter' in Client` +
                    ` options instead when your application is targeting a single data center.`,
            );
        }

        // If localDc is unset, use value set in client options.
        lbp.localDc = lbp.localDc || client.options.localDataCenter;
    }

    const dcs = getDataCenters(hosts);

    if (!lbp.localDc) {
        throw new errors.ArgumentError(
            `'localDataCenter' is not defined in Client options and also was not specified in constructor.` +
                ` At least one is required. Available DCs are: [${Array.from(dcs)}]`,
        );
    }

    if (!dcs.has(lbp.localDc)) {
        throw new errors.ArgumentError(
            `Datacenter ${lbp.localDc} was not found. Available DCs are: [${Array.from(dcs)}]`,
        );
    }
}

/**
 * This load balancing policy replicates the behavior of the Rust Driver Default Load Balancing Policy.
 *
 * It can be configured to be datacenter-aware, rack-aware and token-aware.
 * When the policy is datacenter-aware, you can configure whether to allow datacenter failover
 * (sending query to a node from a remote datacenter).
 */
class DefaultLoadBalancingPolicy extends LoadBalancingRustImplemented {
    /**
     * @type {LoadBalancingConfig}
     */
    #config;

    /**
     * @param {LoadBalancingConfig} [config]
     */
    constructor(config) {
        super();
        if (!config) {
            config = new LoadBalancingConfig();
        }
        this.#config = config;
    }
    /**
     * Gets an associative array containing the policy options.
     */
    getOptions() {
        throwNotSupported("Not implemented.");
    }

    /**
     * @returns {LoadBalancingConfig}
     * @package
     */
    getRustConfiguration() {
        return this.#config;
    }
}

/**
 * This class represents the options of the rust driver Default Load Balancing Policy.
 * This option will be used to configure that policy.
 * You can find more about this policy in the documentation:
 * https://rust-driver.docs.scylladb.com/stable/load-balancing/default-policy.html
 */
class LoadBalancingConfig {
    // Internal note: this documentation is based on the Rust Driver documentation.
    // When major changes are made there, remember to update this documentation accordingly.
    /**
     * Sets the datacenter to be preferred by this policy.
     *
     * Allows the load balancing policy to prioritize nodes based on their location.
     * When a preferred datacenter is set, the policy will treat nodes in that
     * datacenter as "local" nodes, and nodes in other datacenters as "remote" nodes.
     * This affects the order in which nodes are returned by the policy when
     * selecting replicas for read or write operations. If no preferred datacenter
     * is specified, the policy will treat all nodes as local nodes.
     *
     * When datacenter failover is disabled (`permitDcFailover` is set to false),
     * the default policy will only include local nodes in load balancing plans.
     * Remote nodes will be excluded, even if they are alive and available
     * to serve requests.
     *
     * @type {string?}
     */
    preferDatacenter;
    /**
     * This option cannot be used without setting `preferDatacenter`.
     *
     * `preferDatacenter` and `preferRack` set the datacenter and rack to be preferred by this policy.
     *
     * Allows the load balancing policy to prioritize nodes based on their location
     * as well as their availability zones in the preferred datacenter.
     *
     * When a preferred rack is set, the policy will first return replicas in the local rack
     * in the preferred datacenter, and then the other replicas in the datacenter.
     *
     * @type {string?}
     */
    preferRack;
    /**
     * Sets whether this policy is token-aware (balances load more consciously) or not.
     *
     * Token awareness refers to a mechanism by which the driver is aware
     * of the token range assigned to each node in the cluster. Tokens
     * are assigned to nodes to partition the data and distribute it
     * across the cluster.
     *
     * When a user wants to read or write data, the driver can use token awareness
     * to route the request to the correct node based on the token range of the data
     * being accessed. This can help to minimize network traffic and improve
     * performance by ensuring that the data is accessed locally as much as possible.
     *
     * In the case of `DefaultPolicy`, token awareness is enabled by default,
     * meaning that the policy will prefer to return alive local replicas
     * if the token is available. This means that if the client is requesting data
     * that falls within the token range of a particular node, the policy will try
     * to route the request to that node first, assuming it is alive and responsive.
     *
     * Token awareness can significantly improve the performance and scalability
     * of applications built on Scylla. By using token awareness, users can ensure
     * that data is accessed locally as much as possible, reducing network overhead
     * and improving throughput.
     *
     * @type {boolean?}
     */
    tokenAware;
    /**
     * Sets whether this policy permits datacenter failover, i.e. ever attempts
     * to send requests to nodes from a non-preferred datacenter.
     *
     * In the event of a datacenter outage or network failure, the nodes
     * in that datacenter may become unavailable, and clients may no longer
     * be able to access data stored on those nodes. To address this,
     * the `DefaultPolicy` supports datacenter failover, which allows routing
     * requests to nodes in other datacenters if the local nodes are unavailable.
     *
     * Datacenter failover can be enabled in `DefaultPolicy` setting this flag.
     * When it is set, the policy will prefer to return alive remote replicas
     * if datacenter failover is permitted.
     *
     *  @type {boolean?}
     */
    permitDcFailover;
    /**
     * Sets whether this policy should shuffle replicas when token-awareness
     * is enabled. Shuffling can help distribute the load over replicas, but
     * can reduce the effectiveness of caching on the database side (e.g.
     * for reads).
     *
     * This option is enabled by default. If disabled, replicas will be chosen
     * in some random order that is chosen when the load balancing policy
     * is created and will not change over its lifetime.
     *
     * @type {boolean?}
     */
    enableShufflingReplicas;

    /**
     * The hosts address in the format ipAddress:port.
     * When the list is provided, only hosts from this list may get connected to
     * (whether they will get connected to or not depends on the other policy options).
     *
     * @type {Array<string>?}
     */
    allowList;
}

function getDataCenters(hosts) {
    return new Set(hosts.values().map((h) => h.datacenter));
}

module.exports = {
    AllowListPolicy,
    DCAwareRoundRobinPolicy,
    LegacyDefaultLoadBalancingPolicy,
    LoadBalancingPolicy,
    RoundRobinPolicy,
    TokenAwarePolicy,
    LoadBalancingConfig,
    DefaultLoadBalancingPolicy,
};