View Issue Details

IDProjectCategoryView StatusLast Update
0000013KafkaEsqueImprovementpublic2018-11-26 14:21
ReporterpkleindlAssigned Topatschuh 
PrioritynormalSeverityminorReproducibilityhave not tried
Status resolvedResolutionfixed 
Product Version 
Target VersionFixed in Version0.12.0 
Summary0000013: Key Search für Partition
Description

Limit searching for records by key to the partition the records belong to.
Basic code see below.

Steps To Reproduce

public class PartitionFinder {

public static final String TM_TRANSPORT_PLAN_INTERNAL = "tm.transportPlan.internal";
public static final String KEY = "\"1989e567-b6d1-fc12-910c-f027bc56d7f0\"";
private static Logger logger = LoggerFactory.getLogger(PartitionFinder.class);

public static void main(String[] args) throws Exception {

    String bootstrapServers = "widmpp01.lkw-walter.com:9092";

    Properties config = new Properties();
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    AdminClient adminClient = AdminClient.create(config);

    DescribeClusterResult describeClusterResult = adminClient.describeCluster();
    System.out.println(describeClusterResult.clusterId().get());
    System.out.println(describeClusterResult.nodes().get());

    List<TopicPartitionInfo> topicPartitionInfo =
    adminClient.describeTopics(List.of(TM_TRANSPORT_PLAN_INTERNAL)).values().get(TM_TRANSPORT_PLAN_INTERNAL).get().partitions();
    List<PartitionInfo> partitionInfos = topicPartitionInfo.stream().map(new Function<TopicPartitionInfo, PartitionInfo>() {
                                                                             @Override
                                                                             public PartitionInfo apply(TopicPartitionInfo topicPartitionInfo) {
                                                                                 PartitionInfo partitionInfo = new PartitionInfo(
                                                                                         TM_TRANSPORT_PLAN_INTERNAL,topicPartitionInfo.partition(),topicPartitionInfo.leader(),topicPartitionInfo.replicas().toArray(new Node[]{}),topicPartitionInfo.isr().toArray(new Node[]{})
                                                                                 );
                                                                                 return partitionInfo;
                                                                             }

                                                                             @Override
                                                                             public <V> Function<V, PartitionInfo> compose(Function<? super V, ? extends TopicPartitionInfo> before) {
                                                                                 return null;
                                                                             }

                                                                             @Override
                                                                             public <V> Function<TopicPartitionInfo, V> andThen(Function<? super PartitionInfo, ? extends V> after) {
                                                                                 return null;
                                                                             }
                                                                         }).collect(Collectors.toList());

    System.out.println(partitionInfos.toString());

    Cluster cluster = new Cluster(describeClusterResult.clusterId().get(),describeClusterResult.nodes().get(),partitionInfos,new HashSet<>(),new HashSet<>(),null);

    Partitioner partitioner = new DefaultPartitioner();

    System.out.println(cluster.toString() + " : " + cluster.partitionCountForTopic(TM_TRANSPORT_PLAN_INTERNAL));
    int partition = partitioner.partition(TM_TRANSPORT_PLAN_INTERNAL, KEY, KEY.getBytes(),null,null,cluster);

    System.out.println(partition);

    System.exit(0);
}

}

TagsNo tags attached.

Activities

pkleindl

pkleindl

2018-11-26 10:05

reporter   ~0000007

Nachtrag: Ev. Abkürzung: https://stackoverflow.com/questions/50164566/how-to-check-which-partition-is-a-key-assign-to-in-kafka
Damit sollte die Anzahl an Partitions reichen.

patschuh

patschuh

2018-11-26 14:21

developer   ~0000009

Added an option to the trace key dialog, limiting the trace process to one partition

Related Changesets

KafkaEsque: master e572894b

2018-11-26 14:15:51

patschuh

Details Diff
[ESQUE-13]: Add "use fast trace" option to trace key;
apply stylesheet and icon to all dialogs;
minor refactorings
Affected Issues
0000013

Issue History

Date Modified Username Field Change
2018-11-26 08:32 pkleindl New Issue
2018-11-26 10:05 pkleindl Note Added: 0000007
2018-11-26 12:57 patschuh Assigned To => patschuh
2018-11-26 12:57 patschuh Status new => assigned
2018-11-26 14:16 patschuh Changeset attached => KafkaEsque master e572894b
2018-11-26 14:21 patschuh Status assigned => resolved
2018-11-26 14:21 patschuh Resolution open => fixed
2018-11-26 14:21 patschuh Fixed in Version => 0.12.0
2018-11-26 14:21 patschuh Note Added: 0000009