/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.naming.core;

import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.naming.misc.HttpClient;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.pojo.Subscribers;
import com.alibaba.nacos.naming.push.PushService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class SubscribeManager {
    private static final String SUBSCRIBER_ON_SYNC_URL = "/service/subscribers";
    @Autowired
    private PushService pushService;
    @Autowired
    private ServerMemberManager memberManager;

    private List<Subscriber> getSubscribersFuzzy(String serviceName, String namespaceId) {
        return this.pushService.getClientsFuzzy(serviceName, namespaceId);
    }

    private List<Subscriber> getSubscribers(String serviceName, String namespaceId) {
        return this.pushService.getClients(serviceName, namespaceId);
    }

    public List<Subscriber> getSubscribers(String serviceName, String namespaceId, boolean aggregation) throws InterruptedException {
        if (aggregation) {
            if (this.memberManager.getServerList().size() <= 1) {
                return this.getSubscribersFuzzy(serviceName, namespaceId);
            }
            ArrayList<Subscriber> subscriberList = new ArrayList<Subscriber>();
            for (Member server : this.memberManager.allMembers()) {
                HashMap<String, String> paramValues = new HashMap<String, String>(128);
                paramValues.put("serviceName", serviceName);
                paramValues.put("namespaceId", namespaceId);
                paramValues.put("aggregation", String.valueOf(Boolean.FALSE));
                if (NetUtils.localServer().equals(server.getAddress())) {
                    subscriberList.addAll(this.getSubscribersFuzzy(serviceName, namespaceId));
                    continue;
                }
                HttpClient.HttpResult result = HttpClient.httpGet("http://" + server.getAddress() + ApplicationUtils.getContextPath() + "/v1/ns" + SUBSCRIBER_ON_SYNC_URL, new ArrayList<String>(), paramValues);
                if (200 != result.code) continue;
                Subscribers subscribers = (Subscribers)JacksonUtils.toObj((String)result.content, Subscribers.class);
                subscriberList.addAll(subscribers.getSubscribers());
            }
            return CollectionUtils.isNotEmpty(subscriberList) ? subscriberList.stream().filter(SubscribeManager.distinctByKey(Subscriber::toString)).collect(Collectors.toList()) : Collections.EMPTY_LIST;
        }
        return this.getSubscribersFuzzy(serviceName, namespaceId);
    }

    public static <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtractor) {
        ConcurrentHashMap seen = new ConcurrentHashMap(128);
        return object -> seen.putIfAbsent(keyExtractor.apply(object), Boolean.TRUE) == null;
    }
}

