Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

sentry / @nestjs/microservices   js

Repository URL to install this package:

Version: 7.0.10 

/ helpers / kafka-round-robin-partition-assigner.js

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
let kafkaPackage = {};
const time = process.hrtime();
class KafkaRoundRobinPartitionAssigner {
    constructor(config) {
        this.config = config;
        this.name = 'RoundRobinByTime';
        this.version = 1;
        kafkaPackage = load_package_util_1.loadPackage('kafkajs', KafkaRoundRobinPartitionAssigner.name, () => require('kafkajs'));
    }
    /**
     * This process can result in imbalanced assignments
     * @param {array} members array of members, e.g: [{ memberId: 'test-5f93f5a3' }]
     * @param {array} topics
     * @param {Buffer} userData
     * @returns {array} object partitions per topic per member
     */
    async assign(group) {
        const membersCount = group.members.length;
        const assignment = {};
        const sortedMembers = group.members
            .map(member => this.mapToTimeAndMemberId(member))
            .sort((a, b) => this.sortByTime(a, b))
            .map(member => member.memberId);
        sortedMembers.forEach(memberId => {
            assignment[memberId] = {};
        });
        const insertAssignmentsByTopic = (topic) => {
            const partitionMetadata = this.config.cluster.findTopicPartitionMetadata(topic);
            const partitions = partitionMetadata.map(m => m.partitionId);
            sortedMembers.forEach((memberId, i) => {
                if (!assignment[memberId][topic]) {
                    assignment[memberId][topic] = [];
                }
                assignment[memberId][topic].push(...partitions.filter(id => id % membersCount === i));
            });
        };
        group.topics.forEach(insertAssignmentsByTopic);
        return Object.keys(assignment).map(memberId => ({
            memberId,
            memberAssignment: kafkaPackage.AssignerProtocol.MemberAssignment.encode({
                version: this.version,
                assignment: assignment[memberId],
                userData: group.userData,
            }),
        }));
    }
    protocol(subscription) {
        const stringifiedTimeObject = JSON.stringify({
            time: this.getTime(),
        });
        subscription.userData = Buffer.from(stringifiedTimeObject);
        return {
            name: this.name,
            metadata: kafkaPackage.AssignerProtocol.MemberMetadata.encode({
                version: this.version,
                topics: subscription.topics,
                userData: subscription.userData,
            }),
        };
    }
    getTime() {
        return time;
    }
    mapToTimeAndMemberId(member) {
        const memberMetadata = kafkaPackage.AssignerProtocol.MemberMetadata.decode(member.memberMetadata);
        const memberUserData = JSON.parse(memberMetadata.userData.toString());
        return {
            memberId: member.memberId,
            time: memberUserData.time,
        };
    }
    sortByTime(a, b) {
        // if seconds are equal sort by nanoseconds
        if (a.time[0] === b.time[0]) {
            return a.time[1] - b.time[1];
        }
        // sort by seconds
        return a.time[0] - b.time[0];
    }
}
exports.KafkaRoundRobinPartitionAssigner = KafkaRoundRobinPartitionAssigner;