/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols.pbcast;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.ObjectMessage;
import org.jgroups.View;
import org.jgroups.ViewId;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.protocols.TCP;
import org.jgroups.stack.Protocol;
import org.jgroups.util.Digest;
import org.jgroups.util.FixedSizeBitSet;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.MutableDigest;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.Util;

@MBean(description="Computes the broadcast messages that are stable")
public class STABLE
extends Protocol {
    protected static final long MAX_SUSPEND_TIME = 200000L;
    @Property(description="Average time to send a STABLE message", type=AttributeType.TIME)
    protected long desired_avg_gossip = 20000L;
    @Property(description="Maximum number of bytes received in all messages before sending a STABLE message is triggered", type=AttributeType.BYTES)
    protected long max_bytes = 2000000L;
    protected int num_stable_msgs_sent;
    protected int num_stable_msgs_received;
    protected int num_stability_msgs_sent;
    protected int num_stability_msgs_received;
    protected volatile View view;
    protected volatile MutableDigest digest;
    protected FixedSizeBitSet votes;
    protected final Lock lock = new ReentrantLock();
    protected Future<?> stable_task_future;
    protected final Lock stable_task_lock = new ReentrantLock();
    protected TimeScheduler timer;
    @ManagedAttribute(description="Bytes accumulated so far", type=AttributeType.BYTES)
    protected long num_bytes_received;
    protected final Lock received = new ReentrantLock();
    @ManagedAttribute
    protected volatile boolean suspended;
    protected boolean initialized;
    protected Future<?> resume_task_future;
    protected final Object resume_task_mutex = new Object();
    @ManagedAttribute(description="The coordinator")
    protected volatile Address coordinator;

    public long getDesiredAverageGossip() {
        return this.desired_avg_gossip;
    }

    public STABLE setDesiredAverageGossip(long g2) {
        this.desired_avg_gossip = g2;
        return this;
    }

    public long getMaxBytes() {
        return this.max_bytes;
    }

    public STABLE setMaxBytes(long m3) {
        this.max_bytes = m3;
        return this;
    }

    public long getBytes() {
        return this.num_bytes_received;
    }

    @ManagedAttribute(type=AttributeType.SCALAR)
    public int getStableSent() {
        return this.num_stable_msgs_sent;
    }

    @ManagedAttribute(type=AttributeType.SCALAR)
    public int getStableReceived() {
        return this.num_stable_msgs_received;
    }

    @ManagedAttribute(type=AttributeType.SCALAR)
    public int getStabilitySent() {
        return this.num_stability_msgs_sent;
    }

    @ManagedAttribute(type=AttributeType.SCALAR)
    public int getStabilityReceived() {
        return this.num_stability_msgs_received;
    }

    @ManagedAttribute(description="The number of votes for the current digest")
    public int getNumVotes() {
        return this.votes != null ? this.votes.cardinality() : 0;
    }

    @ManagedAttribute
    public boolean getStableTaskRunning() {
        this.stable_task_lock.lock();
        try {
            boolean bl = this.stable_task_future != null && !this.stable_task_future.isDone() && !this.stable_task_future.isCancelled();
            return bl;
        }
        finally {
            this.stable_task_lock.unlock();
        }
    }

    @ManagedOperation(description="Sends a STABLE message; when every member has received a STABLE message from everybody else, a STABILITY message will be sent")
    public void gc() {
        this.sendStableMessage(false);
    }

    @ManagedOperation(description="Prints the current digest")
    public String printDigest() {
        return this.printDigest(this.digest);
    }

    @ManagedOperation(description="Prints the current votes")
    public String printVotes() {
        return this.votes != null ? this.votes.toString() : "n/a";
    }

    @Override
    public void resetStats() {
        super.resetStats();
        this.num_stable_msgs_received = 0;
        this.num_stable_msgs_sent = 0;
        this.num_stability_msgs_sent = 0;
        this.num_stability_msgs_received = 0;
    }

    @Override
    public List<Integer> requiredDownServices() {
        return Collections.singletonList(39);
    }

    protected void suspend(long timeout2) {
        if (!this.suspended) {
            this.suspended = true;
            this.log.debug("%s: suspending message garbage collection", this.local_addr);
        }
        this.startResumeTask(timeout2);
    }

    protected void resume() {
        this.lock.lock();
        try {
            this.resetDigest();
            this.suspended = false;
        }
        finally {
            this.lock.unlock();
        }
        this.log.debug("%s: resuming message garbage collection", this.local_addr);
        this.stopResumeTask();
    }

    @Override
    public void init() throws Exception {
        super.init();
    }

    @Override
    public void start() throws Exception {
        this.timer = this.getTransport().getTimer();
        if (this.timer == null) {
            throw new Exception("timer cannot be retrieved");
        }
        if (this.desired_avg_gossip > 0L) {
            this.startStableTask();
        }
    }

    @Override
    public void stop() {
        this.stopStableTask();
    }

    @Override
    public Object up(Event evt) {
        switch (evt.getType()) {
            case 6: {
                Object retval = this.up_prot.up(evt);
                this.handleViewChange((View)evt.getArg());
                return retval;
            }
        }
        return this.up_prot.up(evt);
    }

    @Override
    public Object up(Message msg) {
        StableHeader hdr = (StableHeader)msg.getHeader(this.id);
        if (hdr == null) {
            this.handleRegularMessage(msg);
            return this.up_prot.up(msg);
        }
        return this.handle(hdr, msg.getSrc(), (Digest)msg.getObject());
    }

    @Override
    public void up(MessageBatch batch) {
        Iterator<Message> it = batch.iterator();
        while (it.hasNext()) {
            Message msg = it.next();
            StableHeader hdr = (StableHeader)msg.getHeader(this.id);
            if (hdr == null) continue;
            it.remove();
            this.handle(hdr, batch.sender(), (Digest)msg.getObject());
        }
        if (this.max_bytes > 0L && batch.dest() == null && !batch.isEmpty() && this.maxBytesExceeded(batch.length())) {
            this.sendStableMessage(true);
        }
        if (!batch.isEmpty()) {
            this.up_prot.up(batch);
        }
    }

    @Override
    public Object down(Message msg) {
        boolean send_stable_msg = this.max_bytes > 0L && msg.getDest() == null && msg.isFlagSet(Message.TransientFlag.DONT_LOOPBACK) && this.maxBytesExceeded(msg.getLength());
        Object retval = this.down_prot.down(msg);
        if (send_stable_msg) {
            this.sendStableMessage(true);
        }
        return retval;
    }

    @Override
    public Object down(Event evt) {
        switch (evt.getType()) {
            case 6: {
                Object retval = this.down_prot.down(evt);
                this.handleViewChange((View)evt.getArg());
                return retval;
            }
            case 65: {
                long timeout2 = 200000L;
                Object t = evt.getArg();
                if (t instanceof Long) {
                    timeout2 = (Long)t;
                }
                this.suspend(timeout2);
                break;
            }
            case 66: {
                this.resume();
            }
        }
        return this.down_prot.down(evt);
    }

    protected Object handle(StableHeader hdr, Address sender, Digest digest) {
        switch (hdr.type) {
            case 1: {
                this.handleStableMessage(digest, sender, hdr.view_id);
                break;
            }
            case 2: {
                this.handleStabilityMessage(digest, sender, hdr.view_id);
                break;
            }
            default: {
                this.log.error("%s: StableHeader type %s not known", this.local_addr, hdr.type);
            }
        }
        return null;
    }

    protected void handleRegularMessage(Message msg) {
        if (this.max_bytes > 0L && msg.getDest() == null && this.maxBytesExceeded(msg.getLength())) {
            this.sendStableMessage(true);
        }
    }

    protected boolean maxBytesExceeded(int len) {
        this.received.lock();
        try {
            this.num_bytes_received += (long)len;
            if (this.num_bytes_received >= this.max_bytes) {
                this.log.trace("%s: max_bytes (%d) has been exceeded; bytes received=%d: triggers stable msg", this.local_addr, this.max_bytes, this.num_bytes_received);
                this.num_bytes_received = 0L;
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.received.unlock();
        }
    }

    protected void handleViewChange(View v) {
        this.lock.lock();
        try {
            this.view = v;
            this.coordinator = v.getCoord();
            this.resetDigest();
            if (!this.initialized) {
                this.initialized = true;
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    protected void updateLocalDigest(Digest d, Address sender) {
        StringBuilder sb = null;
        if (this.log.isTraceEnabled()) {
            sb = new StringBuilder().append(this.local_addr).append(": handling digest from ").append(sender).append(":\nmine:   ").append(this.printDigest(this.digest)).append("\nsender: ").append(this.printDigest(d));
        }
        for (Digest.Entry entry : d) {
            Address mbr = entry.getMember();
            long hd = entry.getHighestDeliveredSeqno();
            long hr = entry.getHighestReceivedSeqno();
            long[] seqnos = this.digest.get(mbr);
            if (seqnos == null) continue;
            long my_hd = seqnos[0];
            long my_hr = seqnos[1];
            if (my_hd == -1L) {
                my_hd = hd;
            }
            long new_hd = Math.min(my_hd, hd);
            long new_hr = Math.max(my_hr, hr);
            this.digest.set(mbr, new_hd, new_hr);
        }
        if (sb != null) {
            this.log.trace(sb.append("\nresult: ").append(this.printDigest(this.digest)).append("\n"));
        }
    }

    protected void resetDigest() {
        if (this.view == null) {
            return;
        }
        this.digest = new MutableDigest(this.view.getMembersRaw());
        this.votes = new FixedSizeBitSet(this.view.size());
    }

    protected boolean addVote(int rank) {
        try {
            return this.votes.set(rank) && STABLE.allVotesReceived(this.votes);
        }
        catch (Throwable t) {
            return false;
        }
    }

    protected static boolean allVotesReceived(FixedSizeBitSet votes) {
        return votes.cardinality() == votes.size();
    }

    protected static int getRank(Address member, View v) {
        if (v == null || member == null) {
            return -1;
        }
        Address[] members2 = v.getMembersRaw();
        for (int i = 0; i < members2.length; ++i) {
            if (!member.equals(members2[i])) continue;
            return i;
        }
        return -1;
    }

    protected void startStableTask() {
        this.stable_task_lock.lock();
        try {
            if (this.stable_task_future == null || this.stable_task_future.isDone()) {
                StableTask stable_task = new StableTask();
                this.stable_task_future = this.timer.scheduleWithDynamicInterval(stable_task, this.getTransport() instanceof TCP);
                this.log.trace("%s: stable task started", this.local_addr);
            }
        }
        finally {
            this.stable_task_lock.unlock();
        }
    }

    protected void stopStableTask() {
        this.stable_task_lock.lock();
        try {
            if (this.stable_task_future != null) {
                this.stable_task_future.cancel(false);
                this.stable_task_future = null;
            }
        }
        finally {
            this.stable_task_lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void startResumeTask(long max_suspend_time) {
        if ((max_suspend_time = (long)((double)max_suspend_time * 1.1)) <= 0L) {
            max_suspend_time = 200000L;
        }
        Object object = this.resume_task_mutex;
        synchronized (object) {
            if (this.resume_task_future == null || this.resume_task_future.isDone()) {
                ResumeTask resume_task = new ResumeTask();
                this.resume_task_future = this.timer.schedule(resume_task, max_suspend_time, TimeUnit.MILLISECONDS, false);
                this.log.debug("%s: resume task started, max_suspend_time=%d", this.local_addr, max_suspend_time);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void stopResumeTask() {
        Object object = this.resume_task_mutex;
        synchronized (object) {
            if (this.resume_task_future != null) {
                this.resume_task_future.cancel(false);
                this.resume_task_future = null;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleStableMessage(Digest d, Address sender, ViewId view_id) {
        if (d == null || sender == null) {
            if (this.log.isErrorEnabled()) {
                this.log.error(Util.getMessage("DigestOrSenderIsNull"));
            }
            return;
        }
        if (!this.initialized || this.suspended) {
            this.log.trace("%s: STABLE message is ignored: initialized=%b, suspended=%b", this.local_addr, this.initialized, this.suspended);
            return;
        }
        if (!view_id.equals(this.view.getViewId())) {
            this.log.trace("%s: discarded STABLE message with different view-id %s (my view-id=%s)", this.local_addr, view_id, this.view.getViewId());
            return;
        }
        MutableDigest stable_digest = null;
        ViewId stable_view_id = null;
        this.lock.lock();
        try {
            int rank = STABLE.getRank(sender, this.view);
            if (rank < 0 || this.votes.get(rank)) {
                return;
            }
            ++this.num_stable_msgs_received;
            this.updateLocalDigest(d, sender);
            if (this.addVote(rank)) {
                stable_digest = this.digest;
                stable_view_id = this.view.getViewId();
                this.resetDigest();
            }
        }
        catch (Throwable t) {
            return;
        }
        finally {
            this.lock.unlock();
        }
        if (stable_digest != null) {
            this.resetNumBytes();
            this.sendStabilityMessage(stable_digest, stable_view_id);
            this.down_prot.down(new Event(30, stable_digest));
        }
    }

    protected void resetNumBytes() {
        this.received.lock();
        try {
            this.num_bytes_received = 0L;
        }
        finally {
            this.received.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleStabilityMessage(Digest stable_digest, Address sender, ViewId view_id) {
        if (stable_digest == null) {
            if (this.log.isErrorEnabled()) {
                this.log.error(Util.getMessage("StabilityDigestIsNull"));
            }
            return;
        }
        if (!this.initialized || this.suspended) {
            this.log.trace("%s: STABLE message is ignored: initialized=%b, suspended=%b", this.local_addr, this.initialized, this.suspended);
            return;
        }
        this.lock.lock();
        try {
            if (!view_id.equals(this.view.getViewId())) {
                this.log.trace("%s: discarded STABILITY message with different view-id %s (my view-id=%s)", this.local_addr, view_id, this.view);
                return;
            }
            this.log.trace("%s: received stability msg from %s: %s", this.local_addr, sender, this.printDigest(stable_digest));
            ++this.num_stability_msgs_received;
            this.resetDigest();
        }
        finally {
            this.lock.unlock();
        }
        this.resetNumBytes();
        this.down_prot.down(new Event(30, stable_digest));
    }

    protected void sendStableMessage(boolean send_in_background) {
        boolean all_set;
        if (this.suspended || this.view == null) {
            return;
        }
        View current_view = this.view;
        Address dest = this.coordinator;
        boolean is_coord = Objects.equals(this.local_addr, this.coordinator);
        MutableDigest d = new MutableDigest(current_view.getMembersRaw()).set(this.getDigest());
        boolean bl = all_set = d.allSet() || d.set(this.getDigest()).allSet();
        if (!all_set) {
            this.log.trace("%s: could not find matching digest for view %s, missing members: %s", this.local_addr, current_view, d.getNonSetMembers());
            return;
        }
        if (is_coord) {
            this.log.trace("%s: updating the local digest with a stable message (coordinator): %s", this.local_addr, d);
            ++this.num_stable_msgs_sent;
            this.handleStableMessage(d, this.local_addr, current_view.getViewId());
            return;
        }
        this.log.trace("%s: sending stable msg to %s: %s", this.local_addr, dest, this.printDigest(d));
        final Message msg = new ObjectMessage(dest, d).setFlag(Message.Flag.OOB, Message.Flag.NO_RELIABILITY).putHeader(this.id, new StableHeader(1, current_view.getViewId()));
        try {
            if (!send_in_background) {
                ++this.num_stable_msgs_sent;
                this.down_prot.down(msg);
                return;
            }
            Runnable r = new Runnable(){

                @Override
                public void run() {
                    STABLE.this.down_prot.down(msg);
                    ++STABLE.this.num_stable_msgs_sent;
                }

                public String toString() {
                    return STABLE.class.getSimpleName() + ": STABLE-GOSSIP";
                }
            };
            this.timer.execute(r, this.getTransport() instanceof TCP);
        }
        catch (Throwable t) {
            this.log.warn("failed sending STABLE message", t);
        }
    }

    protected void sendStabilityMessage(Digest d, ViewId view_id) {
        if (this.suspended) {
            this.log.debug("%s: STABILITY message will not be sent as suspended=%b", this.local_addr, this.suspended);
            return;
        }
        try {
            Message msg = new ObjectMessage(null, d).setFlag(Message.Flag.OOB, Message.Flag.NO_RELIABILITY).setFlag(Message.TransientFlag.DONT_LOOPBACK).putHeader(this.id, new StableHeader(2, view_id));
            this.log.trace("%s: sending stability msg %s", this.local_addr, this.printDigest(d));
            ++this.num_stability_msgs_sent;
            ++this.num_stability_msgs_received;
            this.down_prot.down(msg);
        }
        catch (Exception e) {
            this.log.warn("%s: failed sending STABILITY message: %s", this.local_addr, e);
        }
    }

    protected Digest getDigest() {
        return (Digest)this.down_prot.down(Event.GET_DIGEST_EVT);
    }

    protected String printDigest(Digest digest) {
        if (digest == null) {
            return null;
        }
        return this.view != null ? digest.toString(this.view.getMembersRaw(), false) : digest.toString();
    }

    protected class ResumeTask
    implements Runnable {
        protected ResumeTask() {
        }

        @Override
        public void run() {
            if (STABLE.this.suspended) {
                STABLE.this.log.warn("%s: ResumeTask resumed message garbage collection - this should be done by a RESUME_STABLE event; check why this event was not received (or increase max_suspend_time for large state transfers)", STABLE.this.local_addr);
            }
            STABLE.this.resume();
        }

        public String toString() {
            return STABLE.class.getSimpleName() + ": ResumeTask";
        }
    }

    protected class StableTask
    implements TimeScheduler.Task {
        protected StableTask() {
        }

        @Override
        public long nextInterval() {
            long interval = this.computeSleepTime();
            return interval <= 0L ? STABLE.this.desired_avg_gossip / 2L : interval;
        }

        @Override
        public void run() {
            if (STABLE.this.suspended) {
                STABLE.this.log.trace("%s: stable task will not run as suspended=true", STABLE.this.local_addr);
                return;
            }
            STABLE.this.sendStableMessage(false);
        }

        public String toString() {
            return STABLE.class.getSimpleName() + ": StableTask";
        }

        long computeSleepTime() {
            return Util.random(STABLE.this.desired_avg_gossip * 2L);
        }
    }

    public static class StableHeader
    extends Header {
        public static final byte STABLE_GOSSIP = 1;
        public static final byte STABILITY = 2;
        protected byte type;
        protected ViewId view_id;

        public StableHeader() {
        }

        public StableHeader(byte type2, ViewId view_id) {
            this.type = type2;
            this.view_id = view_id;
        }

        @Override
        public short getMagicId() {
            return 56;
        }

        @Override
        public Supplier<? extends Header> create() {
            return StableHeader::new;
        }

        static String type2String(byte t) {
            switch (t) {
                case 1: {
                    return "STABLE_GOSSIP";
                }
                case 2: {
                    return "STABILITY";
                }
            }
            return "<unknown>";
        }

        @Override
        public String toString() {
            return String.format("[%s] view-id= %s", StableHeader.type2String(this.type), this.view_id);
        }

        @Override
        public int serializedSize() {
            return 1 + Util.size(this.view_id);
        }

        @Override
        public void writeTo(DataOutput out) throws IOException {
            out.writeByte(this.type);
            Util.writeViewId(this.view_id, out);
        }

        @Override
        public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
            this.type = in.readByte();
            this.view_id = Util.readViewId(in);
        }
    }
}

