I'm trying to replicate a topic in MSK using MirrorMaker 2, it's good to say as it is Serverless I don't have access to any instances to make sure that the jar is really present on the machine (classpath).
Basically, I want to completely change the topic name to XYZ
, and I've been referring to a couple of people that tried by simply extending/implementing ReplicationPolicy
. So I did it:
package mypackage.rep;
import .apache.kafka.connect.mirror.ReplicationPolicy;
public class CustomReplicationPolicy implements ReplicationPolicy {
public String formatRemoteTopic(String sourceClusterAlias, String topic) {
return "XYZ";
}
public String topicSource(String topic) {
return topic;
}
public String upstreamTopic(String topic) {
return null;
}
}
Also referred it in the properties with:
replication.policy.class=mypackage.rep.CustomReplicationPolicy
But whenever I try to deploy it, it seems that the connector does not recognize it in the custom plugin zip:
The connector configuration is invalid. Message: Connector configuration is invalid and contains the following 1 error(s): Invalid value mypackage.rep.CustomReplicationPolicy for configuration replication.policy.class: Class mypackage.rep.CustomReplicationPolicy could not be found.
Have anyone faced that?
EDIT: If anyone wants to know the output of jar tf connector.zip
:
CustomReplicationPolicy-3.7.1.jar
__MACOSX/._CustomReplicationPolicy-3.7.1.jar
lib/
lib/CustomReplicationPolicy-3.7.1.jar
__MACOSX/lib/._CustomReplicationPolicy-3.7.1.jar
I'm trying to replicate a topic in MSK using MirrorMaker 2, it's good to say as it is Serverless I don't have access to any instances to make sure that the jar is really present on the machine (classpath).
Basically, I want to completely change the topic name to XYZ
, and I've been referring to a couple of people that tried by simply extending/implementing ReplicationPolicy
. So I did it:
package mypackage.rep;
import .apache.kafka.connect.mirror.ReplicationPolicy;
public class CustomReplicationPolicy implements ReplicationPolicy {
public String formatRemoteTopic(String sourceClusterAlias, String topic) {
return "XYZ";
}
public String topicSource(String topic) {
return topic;
}
public String upstreamTopic(String topic) {
return null;
}
}
Also referred it in the properties with:
replication.policy.class=mypackage.rep.CustomReplicationPolicy
But whenever I try to deploy it, it seems that the connector does not recognize it in the custom plugin zip:
The connector configuration is invalid. Message: Connector configuration is invalid and contains the following 1 error(s): Invalid value mypackage.rep.CustomReplicationPolicy for configuration replication.policy.class: Class mypackage.rep.CustomReplicationPolicy could not be found.
Have anyone faced that?
EDIT: If anyone wants to know the output of jar tf connector.zip
:
CustomReplicationPolicy-3.7.1.jar
__MACOSX/._CustomReplicationPolicy-3.7.1.jar
lib/
lib/CustomReplicationPolicy-3.7.1.jar
__MACOSX/lib/._CustomReplicationPolicy-3.7.1.jar
Share
Improve this question
asked Mar 26 at 12:42
BombardiBombardi
111 bronze badge
1 Answer
Reset to default 0Your approach is great, but won't work specifically with managed Kafka Connect. When you submit a JAR as a plugin, it doesn't get added to the classpath of kafka connect worker. It goes into a plugin dir. Each plugin has it's own classloader for isolation of the dependencies, etc. This is how Kafka Connect workers are using plugins.
So, this means that the MirrorSource Connector won't see the libraries and classes for a custom replication policy.
Alternatively, if you are just trying to achieve same topic name replication, there is an IdentityReplicationPolicy available in Kafka 3. To use it, define your MM2 connector to run in MSK Connect using version 3.7
EDIT (addresses complete change of a topic name):
You can use SMT to change a topic name. Since the question explicitly provides one specific name to another specific name, the regex is concrete. Add this to the mirrorsource connector:
# other properties...
"transforms":"changeABCtoXYZ",
"transforms.changeABCtoXYZ.type": ".apache.kafka.connect.transforms.RegexRouter",
"transforms.changeABCtoXYZ.regex": "ABC",
"transforms.changeABCtoXYZ.replacement": "XYZ",
# other properties...