/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.naming.push;

import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.ClientInfo;
import com.alibaba.nacos.naming.push.DataSource;
import com.alibaba.nacos.naming.push.ServiceChangeEvent;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.codehaus.jackson.util.VersionUtil;
import org.javatuples.Pair;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

@Component
public class PushService
implements ApplicationContextAware,
ApplicationListener<ServiceChangeEvent> {
    @Autowired
    private SwitchDomain switchDomain;
    private ApplicationContext applicationContext;
    private static final long ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10L);
    private static final int MAX_RETRY_TIMES = 1;
    private static volatile ConcurrentMap<String, Receiver.AckEntry> ackMap = new ConcurrentHashMap<String, Receiver.AckEntry>();
    private static ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap = new ConcurrentHashMap<String, ConcurrentMap<String, PushClient>>();
    private static volatile ConcurrentMap<String, Long> udpSendTimeMap = new ConcurrentHashMap<String, Long>();
    public static volatile ConcurrentMap<String, Long> pushCostMap = new ConcurrentHashMap<String, Long>();
    private static int totalPush = 0;
    private static int failedPush = 0;
    private static DatagramSocket udpSocket;
    private static ConcurrentMap<String, Future> futureMap;

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void onApplicationEvent(ServiceChangeEvent event) {
        Service service = event.getService();
        String serviceName = service.getName();
        String namespaceId = service.getNamespaceId();
        ScheduledFuture<?> future = GlobalExecutor.scheduleUdpSender(() -> {
            try {
                Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
                ConcurrentMap clients = (ConcurrentMap)clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                if (MapUtils.isEmpty((Map)clients)) {
                    return;
                }
                HashMap<String, Pair> cache = new HashMap<String, Pair>(16);
                long lastRefTime = System.nanoTime();
                for (PushClient client : clients.values()) {
                    Receiver.AckEntry ackEntry;
                    if (client.zombie()) {
                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        clients.remove(client.toString());
                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        continue;
                    }
                    Loggers.PUSH.debug("push serviceName: {} to client: {}", (Object)serviceName, (Object)client.toString());
                    String key = PushService.getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                    byte[] compressData = null;
                    Map data = null;
                    if (this.switchDomain.getDefaultPushCacheMillis() >= 20000L && cache.containsKey(key)) {
                        Pair pair = (Pair)cache.get(key);
                        compressData = (byte[])pair.getValue0();
                        data = (Map)pair.getValue1();
                        Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", (Object)serviceName, (Object)client.getAddrStr());
                    }
                    if (compressData != null) {
                        ackEntry = PushService.prepareAckEntry(client, compressData, data, lastRefTime);
                    } else {
                        ackEntry = PushService.prepareAckEntry(client, PushService.prepareHostsData(client), lastRefTime);
                        if (ackEntry != null) {
                            cache.put(key, new Pair((Object)ackEntry.origin.getData(), ackEntry.data));
                        }
                    }
                    Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", new Object[]{client.getServiceName(), client.getAddrStr(), client.getAgent(), ackEntry == null ? null : ackEntry.key});
                    PushService.udpPush(ackEntry);
                }
            }
            catch (Exception e) {
                Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", (Object)serviceName, (Object)e);
            }
            finally {
                futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            }
        }, 1000L, TimeUnit.MILLISECONDS);
        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
    }

    public int getTotalPush() {
        return totalPush;
    }

    public void setTotalPush(int totalPush) {
        PushService.totalPush = totalPush;
    }

    public void addClient(String namespaceId, String serviceName, String clusters, String agent, InetSocketAddress socketAddr, DataSource dataSource, String tenant, String app) {
        PushClient client = new PushClient(namespaceId, serviceName, clusters, agent, socketAddr, dataSource, tenant, app);
        this.addClient(client);
    }

    public void addClient(PushClient client) {
        PushClient oldClient;
        String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName());
        ConcurrentMap clients = (ConcurrentMap)clientMap.get(serviceKey);
        if (clients == null) {
            clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap(1024));
            clients = (ConcurrentMap)clientMap.get(serviceKey);
        }
        if ((oldClient = (PushClient)clients.get(client.toString())) != null) {
            oldClient.refresh();
        } else {
            PushClient res = clients.putIfAbsent(client.toString(), client);
            if (res != null) {
                Loggers.PUSH.warn("client: {} already associated with key {}", (Object)res.getAddrStr(), (Object)res.toString());
            }
            Loggers.PUSH.debug("client: {} added for serviceName: {}", (Object)client.getAddrStr(), (Object)client.getServiceName());
        }
    }

    public List<Subscriber> getClients(String serviceName, String namespaceId) {
        String serviceKey = UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName);
        ConcurrentMap clientConcurrentMap = (ConcurrentMap)clientMap.get(serviceKey);
        if (Objects.isNull(clientConcurrentMap)) {
            return null;
        }
        ArrayList<Subscriber> clients = new ArrayList<Subscriber>();
        clientConcurrentMap.forEach((key, client) -> clients.add(new Subscriber(client.getAddrStr(), client.getAgent(), client.getApp(), client.getIp(), namespaceId, serviceName)));
        return clients;
    }

    public List<Subscriber> getClientsFuzzy(String serviceName, String namespaceId) {
        ArrayList<Subscriber> clients = new ArrayList<Subscriber>();
        clientMap.forEach((outKey, clientConcurrentMap) -> {
            String serviceFullName = outKey.split("##")[1];
            String groupName = NamingUtils.getGroupName((String)serviceFullName);
            String name = NamingUtils.getServiceName((String)serviceFullName);
            if (outKey.startsWith(namespaceId) && name.indexOf(NamingUtils.getServiceName((String)serviceName)) >= 0 && groupName.indexOf(NamingUtils.getGroupName((String)serviceName)) >= 0) {
                clientConcurrentMap.forEach((key, client) -> clients.add(new Subscriber(client.getAddrStr(), client.getAgent(), client.getApp(), client.getIp(), namespaceId, serviceFullName)));
            }
        });
        return clients;
    }

    private static void removeClientIfZombie() {
        int size = 0;
        for (Map.Entry entry : clientMap.entrySet()) {
            ConcurrentMap clientConcurrentMap = (ConcurrentMap)entry.getValue();
            for (Map.Entry entry1 : clientConcurrentMap.entrySet()) {
                PushClient client = (PushClient)entry1.getValue();
                if (!client.zombie()) continue;
                clientConcurrentMap.remove(entry1.getKey());
            }
            size += clientConcurrentMap.size();
        }
        if (Loggers.PUSH.isDebugEnabled()) {
            Loggers.PUSH.debug("[NACOS-PUSH] clientMap size: {}", (Object)size);
        }
    }

    private static Receiver.AckEntry prepareAckEntry(PushClient client, Map<String, Object> data, long lastRefTime) {
        if (MapUtils.isEmpty(data)) {
            Loggers.PUSH.error("[NACOS-PUSH] pushing empty data for client is not allowed: {}", (Object)client);
            return null;
        }
        data.put("lastRefTime", lastRefTime);
        String key = PushService.getAckKey(client.getSocketAddr().getAddress().getHostAddress(), client.getSocketAddr().getPort(), lastRefTime);
        String dataStr = JacksonUtils.toJson(data);
        try {
            byte[] dataBytes = dataStr.getBytes(StandardCharsets.UTF_8);
            dataBytes = PushService.compressIfNecessary(dataBytes);
            DatagramPacket packet = new DatagramPacket(dataBytes, dataBytes.length, client.socketAddr);
            Receiver.AckEntry ackEntry = new Receiver.AckEntry(key, packet);
            ackEntry.data = data;
            return ackEntry;
        }
        catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to prepare data: {} to client: {}, error: {}", new Object[]{data, client.getSocketAddr(), e});
            return null;
        }
    }

    private static Receiver.AckEntry prepareAckEntry(PushClient client, byte[] dataBytes, Map<String, Object> data, long lastRefTime) {
        String key = PushService.getAckKey(client.getSocketAddr().getAddress().getHostAddress(), client.getSocketAddr().getPort(), lastRefTime);
        DatagramPacket packet = null;
        try {
            packet = new DatagramPacket(dataBytes, dataBytes.length, client.socketAddr);
            Receiver.AckEntry ackEntry = new Receiver.AckEntry(key, packet);
            ackEntry.data = data;
            return ackEntry;
        }
        catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to prepare data: {} to client: {}, error: {}", new Object[]{data, client.getSocketAddr(), e});
            return null;
        }
    }

    public static String getPushCacheKey(String serviceName, String clientIP, String agent) {
        return serviceName + "@@@@" + agent;
    }

    public void serviceChanged(Service service) {
        if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
            return;
        }
        this.applicationContext.publishEvent((ApplicationEvent)new ServiceChangeEvent(this, service));
    }

    public boolean canEnablePush(String agent) {
        if (!this.switchDomain.isPushEnabled()) {
            return false;
        }
        ClientInfo clientInfo = new ClientInfo(agent);
        if (ClientInfo.ClientType.JAVA == clientInfo.type && clientInfo.version.compareTo(VersionUtil.parseVersion((String)this.switchDomain.getPushJavaVersion())) >= 0) {
            return true;
        }
        if (ClientInfo.ClientType.DNS == clientInfo.type && clientInfo.version.compareTo(VersionUtil.parseVersion((String)this.switchDomain.getPushPythonVersion())) >= 0) {
            return true;
        }
        if (ClientInfo.ClientType.C == clientInfo.type && clientInfo.version.compareTo(VersionUtil.parseVersion((String)this.switchDomain.getPushCVersion())) >= 0) {
            return true;
        }
        return ClientInfo.ClientType.GO == clientInfo.type && clientInfo.version.compareTo(VersionUtil.parseVersion((String)this.switchDomain.getPushGoVersion())) >= 0;
    }

    public static List<Receiver.AckEntry> getFailedPushes() {
        return new ArrayList<Receiver.AckEntry>(ackMap.values());
    }

    public int getFailedPushCount() {
        return ackMap.size() + failedPush;
    }

    public void setFailedPush(int failedPush) {
        PushService.failedPush = failedPush;
    }

    public static void resetPushState() {
        ackMap.clear();
    }

    private static byte[] compressIfNecessary(byte[] dataBytes) throws IOException {
        int maxDataSizeUncompress = 1024;
        if (dataBytes.length < maxDataSizeUncompress) {
            return dataBytes;
        }
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        GZIPOutputStream gzip = new GZIPOutputStream(out);
        gzip.write(dataBytes);
        gzip.close();
        return out.toByteArray();
    }

    private static Map<String, Object> prepareHostsData(PushClient client) throws Exception {
        HashMap<String, Object> cmd = new HashMap<String, Object>(2);
        cmd.put("type", "dom");
        cmd.put("data", client.getDataSource().getData(client));
        return cmd;
    }

    private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
        if (ackEntry == null) {
            Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
            return null;
        }
        if (ackEntry.getRetryTimes() > 1) {
            Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", (Object)ackEntry.retryTimes, (Object)ackEntry.key);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            ++failedPush;
            return ackEntry;
        }
        try {
            if (!ackMap.containsKey(ackEntry.key)) {
                ++totalPush;
            }
            ackMap.put(ackEntry.key, ackEntry);
            udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
            Loggers.PUSH.info("send udp packet: " + ackEntry.key);
            udpSocket.send(ackEntry.origin);
            ackEntry.increaseRetryTime();
            GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
            return ackEntry;
        }
        catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", new Object[]{ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e});
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            ++failedPush;
            return null;
        }
    }

    private static String getAckKey(String host, int port, long lastRefTime) {
        return StringUtils.strip((String)host) + "," + port + "," + lastRefTime;
    }

    static {
        futureMap = new ConcurrentHashMap<String, Future>();
        try {
            udpSocket = new DatagramSocket();
            Receiver receiver = new Receiver();
            Thread inThread = new Thread(receiver);
            inThread.setDaemon(true);
            inThread.setName("com.alibaba.nacos.naming.push.receiver");
            inThread.start();
            GlobalExecutor.scheduleRetransmitter(() -> {
                try {
                    PushService.removeClientIfZombie();
                }
                catch (Throwable e) {
                    Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie");
                }
            }, 0L, 20L, TimeUnit.SECONDS);
        }
        catch (SocketException e) {
            Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
        }
    }

    public static class Receiver
    implements Runnable {
        @Override
        public void run() {
            while (true) {
                byte[] buffer = new byte[65536];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                try {
                    udpSocket.receive(packet);
                    String json = new String(packet.getData(), 0, packet.getLength(), StandardCharsets.UTF_8).trim();
                    AckPacket ackPacket = (AckPacket)JacksonUtils.toObj((String)json, AckPacket.class);
                    InetSocketAddress socketAddress = (InetSocketAddress)packet.getSocketAddress();
                    String ip = socketAddress.getAddress().getHostAddress();
                    int port = socketAddress.getPort();
                    if (System.nanoTime() - ackPacket.lastRefTime > ACK_TIMEOUT_NANOS) {
                        Loggers.PUSH.warn("ack takes too long from {} ack json: {}", (Object)packet.getSocketAddress(), (Object)json);
                    }
                    String ackKey = PushService.getAckKey(ip, port, ackPacket.lastRefTime);
                    AckEntry ackEntry = (AckEntry)ackMap.remove(ackKey);
                    if (ackEntry == null) {
                        throw new IllegalStateException("unable to find ackEntry for key: " + ackKey + ", ack json: " + json);
                    }
                    long pushCost = System.currentTimeMillis() - (Long)udpSendTimeMap.get(ackKey);
                    Loggers.PUSH.info("received ack: {} from: {}:{}, cost: {} ms, unacked: {}, total push: {}", new Object[]{json, ip, port, pushCost, ackMap.size(), totalPush});
                    pushCostMap.put(ackKey, pushCost);
                    udpSendTimeMap.remove(ackKey);
                    continue;
                }
                catch (Throwable e) {
                    Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data", e);
                    continue;
                }
                break;
            }
        }

        public static class AckPacket {
            public String type;
            public long lastRefTime;
            public String data;
        }

        public static class AckEntry {
            public String key;
            public DatagramPacket origin;
            private AtomicInteger retryTimes = new AtomicInteger(0);
            public Map<String, Object> data;

            public AckEntry(String key, DatagramPacket packet) {
                this.key = key;
                this.origin = packet;
            }

            public void increaseRetryTime() {
                this.retryTimes.incrementAndGet();
            }

            public int getRetryTimes() {
                return this.retryTimes.get();
            }
        }
    }

    public static class Retransmitter
    implements Runnable {
        Receiver.AckEntry ackEntry;

        public Retransmitter(Receiver.AckEntry ackEntry) {
            this.ackEntry = ackEntry;
        }

        @Override
        public void run() {
            if (ackMap.containsKey(this.ackEntry.key)) {
                Loggers.PUSH.info("retry to push data, key: " + this.ackEntry.key);
                PushService.udpPush(this.ackEntry);
            }
        }
    }

    public class PushClient {
        private String namespaceId;
        private String serviceName;
        private String clusters;
        private String agent;
        private String tenant;
        private String app;
        private InetSocketAddress socketAddr;
        private DataSource dataSource;
        private Map<String, String[]> params;
        public long lastRefTime = System.currentTimeMillis();

        public Map<String, String[]> getParams() {
            return this.params;
        }

        public void setParams(Map<String, String[]> params) {
            this.params = params;
        }

        public PushClient(String namespaceId, String serviceName, String clusters, String agent, InetSocketAddress socketAddr, DataSource dataSource, String tenant, String app) {
            this.namespaceId = namespaceId;
            this.serviceName = serviceName;
            this.clusters = clusters;
            this.agent = agent;
            this.socketAddr = socketAddr;
            this.dataSource = dataSource;
            this.tenant = tenant;
            this.app = app;
        }

        public DataSource getDataSource() {
            return this.dataSource;
        }

        public boolean zombie() {
            return System.currentTimeMillis() - this.lastRefTime > PushService.this.switchDomain.getPushCacheMillis(this.serviceName);
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("serviceName: ").append(this.serviceName).append(", clusters: ").append(this.clusters).append(", address: ").append(this.socketAddr).append(", agent: ").append(this.agent);
            return sb.toString();
        }

        public String getAgent() {
            return this.agent;
        }

        public String getAddrStr() {
            return this.socketAddr.getAddress().getHostAddress() + ":" + this.socketAddr.getPort();
        }

        public String getIp() {
            return this.socketAddr.getAddress().getHostAddress();
        }

        public int hashCode() {
            return Objects.hash(this.serviceName, this.clusters, this.socketAddr);
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof PushClient)) {
                return false;
            }
            PushClient other = (PushClient)obj;
            return this.serviceName.equals(other.serviceName) && this.clusters.equals(other.clusters) && this.socketAddr.equals(other.socketAddr);
        }

        public String getClusters() {
            return this.clusters;
        }

        public void setClusters(String clusters) {
            this.clusters = clusters;
        }

        public String getNamespaceId() {
            return this.namespaceId;
        }

        public void setNamespaceId(String namespaceId) {
            this.namespaceId = namespaceId;
        }

        public String getServiceName() {
            return this.serviceName;
        }

        public void setServiceName(String serviceName) {
            this.serviceName = serviceName;
        }

        public String getTenant() {
            return this.tenant;
        }

        public void setTenant(String tenant) {
            this.tenant = tenant;
        }

        public String getApp() {
            return this.app;
        }

        public void setApp(String app) {
            this.app = app;
        }

        public InetSocketAddress getSocketAddr() {
            return this.socketAddr;
        }

        public void refresh() {
            this.lastRefTime = System.currentTimeMillis();
        }
    }
}

