"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
let kafkaPackage = {};
const time = process.hrtime();
class KafkaRoundRobinPartitionAssigner {
constructor(config) {
this.config = config;
this.name = 'RoundRobinByTime';
this.version = 1;
kafkaPackage = load_package_util_1.loadPackage('kafkajs', KafkaRoundRobinPartitionAssigner.name, () => require('kafkajs'));
}
/**
* This process can result in imbalanced assignments
* @param {array} members array of members, e.g: [{ memberId: 'test-5f93f5a3' }]
* @param {array} topics
* @param {Buffer} userData
* @returns {array} object partitions per topic per member
*/
async assign(group) {
const membersCount = group.members.length;
const assignment = {};
const sortedMembers = group.members
.map(member => this.mapToTimeAndMemberId(member))
.sort((a, b) => this.sortByTime(a, b))
.map(member => member.memberId);
sortedMembers.forEach(memberId => {
assignment[memberId] = {};
});
const insertAssignmentsByTopic = (topic) => {
const partitionMetadata = this.config.cluster.findTopicPartitionMetadata(topic);
const partitions = partitionMetadata.map(m => m.partitionId);
sortedMembers.forEach((memberId, i) => {
if (!assignment[memberId][topic]) {
assignment[memberId][topic] = [];
}
assignment[memberId][topic].push(...partitions.filter(id => id % membersCount === i));
});
};
group.topics.forEach(insertAssignmentsByTopic);
return Object.keys(assignment).map(memberId => ({
memberId,
memberAssignment: kafkaPackage.AssignerProtocol.MemberAssignment.encode({
version: this.version,
assignment: assignment[memberId],
userData: group.userData,
}),
}));
}
protocol(subscription) {
const stringifiedTimeObject = JSON.stringify({
time: this.getTime(),
});
subscription.userData = Buffer.from(stringifiedTimeObject);
return {
name: this.name,
metadata: kafkaPackage.AssignerProtocol.MemberMetadata.encode({
version: this.version,
topics: subscription.topics,
userData: subscription.userData,
}),
};
}
getTime() {
return time;
}
mapToTimeAndMemberId(member) {
const memberMetadata = kafkaPackage.AssignerProtocol.MemberMetadata.decode(member.memberMetadata);
const memberUserData = JSON.parse(memberMetadata.userData.toString());
return {
memberId: member.memberId,
time: memberUserData.time,
};
}
sortByTime(a, b) {
// if seconds are equal sort by nanoseconds
if (a.time[0] === b.time[0]) {
return a.time[1] - b.time[1];
}
// sort by seconds
return a.time[0] - b.time[0];
}
}
exports.KafkaRoundRobinPartitionAssigner = KafkaRoundRobinPartitionAssigner;