最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

amazon web services - MSK Connector | MirrorMaker 2: Class not found - Stack Overflow

programmeradmin6浏览0评论

I'm trying to replicate a topic in MSK using MirrorMaker 2, it's good to say as it is Serverless I don't have access to any instances to make sure that the jar is really present on the machine (classpath).

Basically, I want to completely change the topic name to XYZ, and I've been referring to a couple of people that tried by simply extending/implementing ReplicationPolicy. So I did it:

package mypackage.rep;

import .apache.kafka.connect.mirror.ReplicationPolicy;

public class CustomReplicationPolicy implements ReplicationPolicy {

    public String formatRemoteTopic(String sourceClusterAlias, String topic) {
        return "XYZ";
    }

    public String topicSource(String topic) {
        return topic;
    }

    public String upstreamTopic(String topic) {
        return null;
    }
}

Also referred it in the properties with:

replication.policy.class=mypackage.rep.CustomReplicationPolicy

But whenever I try to deploy it, it seems that the connector does not recognize it in the custom plugin zip:

The connector configuration is invalid. Message: Connector configuration is invalid and contains the following 1 error(s): Invalid value mypackage.rep.CustomReplicationPolicy for configuration replication.policy.class: Class mypackage.rep.CustomReplicationPolicy could not be found.

Have anyone faced that?

EDIT: If anyone wants to know the output of jar tf connector.zip:

CustomReplicationPolicy-3.7.1.jar
__MACOSX/._CustomReplicationPolicy-3.7.1.jar
lib/
lib/CustomReplicationPolicy-3.7.1.jar
__MACOSX/lib/._CustomReplicationPolicy-3.7.1.jar

I'm trying to replicate a topic in MSK using MirrorMaker 2, it's good to say as it is Serverless I don't have access to any instances to make sure that the jar is really present on the machine (classpath).

Basically, I want to completely change the topic name to XYZ, and I've been referring to a couple of people that tried by simply extending/implementing ReplicationPolicy. So I did it:

package mypackage.rep;

import .apache.kafka.connect.mirror.ReplicationPolicy;

public class CustomReplicationPolicy implements ReplicationPolicy {

    public String formatRemoteTopic(String sourceClusterAlias, String topic) {
        return "XYZ";
    }

    public String topicSource(String topic) {
        return topic;
    }

    public String upstreamTopic(String topic) {
        return null;
    }
}

Also referred it in the properties with:

replication.policy.class=mypackage.rep.CustomReplicationPolicy

But whenever I try to deploy it, it seems that the connector does not recognize it in the custom plugin zip:

The connector configuration is invalid. Message: Connector configuration is invalid and contains the following 1 error(s): Invalid value mypackage.rep.CustomReplicationPolicy for configuration replication.policy.class: Class mypackage.rep.CustomReplicationPolicy could not be found.

Have anyone faced that?

EDIT: If anyone wants to know the output of jar tf connector.zip:

CustomReplicationPolicy-3.7.1.jar
__MACOSX/._CustomReplicationPolicy-3.7.1.jar
lib/
lib/CustomReplicationPolicy-3.7.1.jar
__MACOSX/lib/._CustomReplicationPolicy-3.7.1.jar
Share Improve this question asked Mar 26 at 12:42 BombardiBombardi 111 bronze badge
Add a comment  | 

1 Answer 1

Reset to default 0

Your approach is great, but won't work specifically with managed Kafka Connect. When you submit a JAR as a plugin, it doesn't get added to the classpath of kafka connect worker. It goes into a plugin dir. Each plugin has it's own classloader for isolation of the dependencies, etc. This is how Kafka Connect workers are using plugins.

So, this means that the MirrorSource Connector won't see the libraries and classes for a custom replication policy.

Alternatively, if you are just trying to achieve same topic name replication, there is an IdentityReplicationPolicy available in Kafka 3. To use it, define your MM2 connector to run in MSK Connect using version 3.7

EDIT (addresses complete change of a topic name):

You can use SMT to change a topic name. Since the question explicitly provides one specific name to another specific name, the regex is concrete. Add this to the mirrorsource connector:

# other properties...
"transforms":"changeABCtoXYZ",
"transforms.changeABCtoXYZ.type": ".apache.kafka.connect.transforms.RegexRouter",
"transforms.changeABCtoXYZ.regex": "ABC",
"transforms.changeABCtoXYZ.replacement": "XYZ",
# other properties...
发布评论

评论列表(0)

  1. 暂无评论