Weźmy sobie konstruktory takiego KafkaConsumer
. Inicjowanie różnych pól napisane jest ciągiem, wielkie traje (try
) na kilkadziesiąt linijek. Czy to nie powinno być wydzielone chociaż do prywatnych metod?
public KafkaConsumer(Map<String, Object> configs) {
this((Map)configs, (Deserializer)null, (Deserializer)null);
}
public KafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer);
}
public KafkaConsumer(Properties properties) {
this((Properties)properties, (Deserializer)null, (Deserializer)null);
}
public KafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer);
}
private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this.closed = false;
this.currentThread = new AtomicLong(-1L);
this.refcount = new AtomicInteger(0);
try {
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config, ProtocolType.CONSUMER);
this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
this.clientId = buildClientId(config.getString("client.id"), groupRebalanceConfig);
LogContext logContext;
if (groupRebalanceConfig.groupInstanceId.isPresent()) {
logContext = new LogContext("[Consumer instanceId=" + (String)groupRebalanceConfig.groupInstanceId.get() + ", clientId=" + this.clientId + ", groupId=" + (String)this.groupId.orElse("null") + "] ");
} else {
logContext = new LogContext("[Consumer clientId=" + this.clientId + ", groupId=" + (String)this.groupId.orElse("null") + "] ");
}
this.log = logContext.logger(this.getClass());
boolean enableAutoCommit = config.getBoolean("enable.auto.commit");
if (!this.groupId.isPresent()) {
if (!config.originals().containsKey("enable.auto.commit")) {
enableAutoCommit = false;
} else if (enableAutoCommit) {
throw new InvalidConfigurationException("enable.auto.commit cannot be set to true when default group id (null) is used.");
}
} else if (((String)this.groupId.get()).isEmpty()) {
this.log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release.");
}
this.log.debug("Initializing the Kafka consumer");
this.requestTimeoutMs = (long)config.getInt("request.timeout.ms");
this.defaultApiTimeoutMs = config.getInt("default.api.timeout.ms");
this.time = Time.SYSTEM;
this.metrics = buildMetrics(config, this.time, this.clientId);
this.retryBackoffMs = config.getLong("retry.backoff.ms");
Map<String, Object> userProvidedConfigs = config.originals();
userProvidedConfigs.put("client.id", this.clientId);
List<ConsumerInterceptor<K, V>> interceptorList = (new ConsumerConfig(userProvidedConfigs, false)).getConfiguredInstances("interceptor.classes", ConsumerInterceptor.class);
this.interceptors = new ConsumerInterceptors(interceptorList);
if (keyDeserializer == null) {
this.keyDeserializer = (Deserializer)config.getConfiguredInstance("key.deserializer", Deserializer.class);
this.keyDeserializer.configure(config.originals(), true);
} else {
config.ignore("key.deserializer");
this.keyDeserializer = keyDeserializer;
}
if (valueDeserializer == null) {
this.valueDeserializer = (Deserializer)config.getConfiguredInstance("value.deserializer", Deserializer.class);
this.valueDeserializer.configure(config.originals(), false);
} else {
config.ignore("value.deserializer");
this.valueDeserializer = valueDeserializer;
}
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString("auto.offset.reset").toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(keyDeserializer, valueDeserializer, this.metrics.reporters(), interceptorList);
this.metadata = new ConsumerMetadata(this.retryBackoffMs, config.getLong("metadata.max.age.ms"), !config.getBoolean("exclude.internal.topics"), config.getBoolean("allow.auto.create.topics"), this.subscriptions, logContext, clusterResourceListeners);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"), config.getString("client.dns.lookup"));
this.metadata.bootstrap(addresses);
String metricGrpPrefix = "consumer";
FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry(Collections.singleton("client-id"), metricGrpPrefix);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, this.time, logContext);
IsolationLevel isolationLevel = IsolationLevel.valueOf(config.getString("isolation.level").toUpperCase(Locale.ROOT));
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(this.metrics, metricsRegistry);
int heartbeatIntervalMs = config.getInt("heartbeat.interval.ms");
ApiVersions apiVersions = new ApiVersions();
NetworkClient netClient = new NetworkClient(new Selector(config.getLong("connections.max.idle.ms"), this.metrics, this.time, metricGrpPrefix, channelBuilder, logContext), this.metadata, this.clientId, 100, config.getLong("reconnect.backoff.ms"), config.getLong("reconnect.backoff.max.ms"), config.getInt("send.buffer.bytes"), config.getInt("receive.buffer.bytes"), config.getInt("request.timeout.ms"), ClientDnsLookup.forConfig(config.getString("client.dns.lookup")), this.time, true, apiVersions, throttleTimeSensor, logContext);
this.client = new ConsumerNetworkClient(logContext, netClient, this.metadata, this.time, this.retryBackoffMs, config.getInt("request.timeout.ms"), heartbeatIntervalMs);
this.assignors = PartitionAssignorAdapter.getAssignorInstances(config.getList("partition.assignment.strategy"), config.originals());
this.coordinator = !this.groupId.isPresent() ? null : new ConsumerCoordinator(groupRebalanceConfig, logContext, this.client, this.assignors, this.metadata, this.subscriptions, this.metrics, metricGrpPrefix, this.time, enableAutoCommit, config.getInt("auto.commit.interval.ms"), this.interceptors);
this.fetcher = new Fetcher(logContext, this.client, config.getInt("fetch.min.bytes"), config.getInt("fetch.max.bytes"), config.getInt("fetch.max.wait.ms"), config.getInt("max.partition.fetch.bytes"), config.getInt("max.poll.records"), config.getBoolean("check.crcs"), config.getString("client.rack"), this.keyDeserializer, this.valueDeserializer, this.metadata, this.subscriptions, this.metrics, metricsRegistry, this.time, this.retryBackoffMs, this.requestTimeoutMs, isolationLevel, apiVersions);
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(this.metrics, metricGrpPrefix);
config.logUnused();
AppInfoParser.registerAppInfo("kafka.consumer", this.clientId, this.metrics, this.time.milliseconds());
this.log.debug("Kafka consumer initialized");
} catch (Throwable var20) {
if (this.log != null) {
this.close(0L, true);
}
throw new KafkaException("Failed to construct kafka consumer", var20);
}
}
KafkaConsumer(LogContext logContext, String clientId, ConsumerCoordinator coordinator, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Fetcher<K, V> fetcher, ConsumerInterceptors<K, V> interceptors, Time time, ConsumerNetworkClient client, Metrics metrics, SubscriptionState subscriptions, ConsumerMetadata metadata, long retryBackoffMs, long requestTimeoutMs, int defaultApiTimeoutMs, List<ConsumerPartitionAssignor> assignors, String groupId) {
this.closed = false;
this.currentThread = new AtomicLong(-1L);
this.refcount = new AtomicInteger(0);
this.log = logContext.logger(this.getClass());
this.clientId = clientId;
this.coordinator = coordinator;
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
this.fetcher = fetcher;
this.interceptors = (ConsumerInterceptors)Objects.requireNonNull(interceptors);
this.time = time;
this.client = client;
this.metrics = metrics;
this.subscriptions = subscriptions;
this.metadata = metadata;
this.retryBackoffMs = retryBackoffMs;
this.requestTimeoutMs = requestTimeoutMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.assignors = assignors;
this.groupId = Optional.ofNullable(groupId);
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
}
99xmarcin