/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.eventhubs.impl;

import com.microsoft.azure.eventhubs.OperationCancelledException;
import com.microsoft.azure.eventhubs.TimeoutException;
import com.microsoft.azure.eventhubs.impl.AmqpConnection;
import com.microsoft.azure.eventhubs.impl.AmqpResponseCode;
import com.microsoft.azure.eventhubs.impl.DispatchHandler;
import com.microsoft.azure.eventhubs.impl.ExceptionUtil;
import com.microsoft.azure.eventhubs.impl.FaultTolerantObject;
import com.microsoft.azure.eventhubs.impl.IOObject;
import com.microsoft.azure.eventhubs.impl.OperationResult;
import com.microsoft.azure.eventhubs.impl.ReactorDispatcher;
import com.microsoft.azure.eventhubs.impl.RequestResponseChannel;
import com.microsoft.azure.eventhubs.impl.RequestResponseCloser;
import com.microsoft.azure.eventhubs.impl.RequestResponseOpener;
import com.microsoft.azure.eventhubs.impl.SessionProvider;
import com.microsoft.azure.eventhubs.impl.StringUtil;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.message.Message;

final class ManagementChannel {
    final FaultTolerantObject<RequestResponseChannel> innerChannel;
    final SessionProvider sessionProvider;
    final AmqpConnection connectionEventDispatcher;

    public ManagementChannel(SessionProvider sessionProvider, AmqpConnection connection) {
        this.sessionProvider = sessionProvider;
        this.connectionEventDispatcher = connection;
        RequestResponseCloser closer = new RequestResponseCloser();
        this.innerChannel = new FaultTolerantObject<RequestResponseChannel>(new RequestResponseOpener(sessionProvider, "mgmt-session", "mgmt", "$management", connection), closer);
        closer.setInnerChannel(this.innerChannel);
    }

    public CompletableFuture<Map<String, Object>> request(ReactorDispatcher dispatcher, Map<String, Object> request, final long timeoutInMillis) {
        final Message requestMessage = Proton.message();
        ApplicationProperties applicationProperties = new ApplicationProperties(request);
        requestMessage.setApplicationProperties(applicationProperties);
        final CompletableFuture<Map<String, Object>> resultFuture = new CompletableFuture<Map<String, Object>>();
        try {
            dispatcher.invoke((int)timeoutInMillis, new DispatchHandler(){

                @Override
                public void onEvent() {
                    String errorMessage;
                    RequestResponseChannel channel = ManagementChannel.this.innerChannel.unsafeGetIfOpened();
                    if (channel != null && channel.getState() == IOObject.IOObjectState.OPENED) {
                        String remoteContainerId = channel.getSendLink().getSession().getConnection().getRemoteContainer();
                        errorMessage = String.format("Management request timed out (%sms), after not receiving response from service. TrackingId: %s", timeoutInMillis, StringUtil.isNullOrEmpty(remoteContainerId) ? "n/a" : remoteContainerId);
                    } else {
                        errorMessage = "Management request timed out on the client - enable info level tracing to diagnose.";
                    }
                    resultFuture.completeExceptionally(new TimeoutException(errorMessage));
                }
            });
        }
        catch (IOException ioException) {
            resultFuture.completeExceptionally(new OperationCancelledException("Sending request failed while dispatching to Reactor, see cause for more details.", (Throwable)ioException));
            return resultFuture;
        }
        if (timeoutInMillis > 5L) {
            this.innerChannel.runOnOpenedObject(dispatcher, new OperationResult<RequestResponseChannel, Exception>(){

                @Override
                public void onComplete(RequestResponseChannel result) {
                    result.request(requestMessage, new OperationResult<Message, Exception>(){

                        @Override
                        public void onComplete(Message response) {
                            int statusCode = (Integer)response.getApplicationProperties().getValue().get("status-code");
                            String statusDescription = (String)response.getApplicationProperties().getValue().get("status-description");
                            if (statusCode == AmqpResponseCode.ACCEPTED.getValue() || statusCode == AmqpResponseCode.OK.getValue()) {
                                if (response.getBody() != null) {
                                    resultFuture.complete((Map)((AmqpValue)response.getBody()).getValue());
                                }
                            } else {
                                this.onError(ExceptionUtil.amqpResponseCodeToException(statusCode, statusDescription));
                            }
                        }

                        @Override
                        public void onError(Exception error) {
                            resultFuture.completeExceptionally(error);
                        }
                    });
                }

                @Override
                public void onError(Exception error) {
                    resultFuture.completeExceptionally(error);
                }
            });
        }
        return resultFuture;
    }

    public void close(ReactorDispatcher reactorDispatcher, OperationResult<Void, Exception> closeCallback) {
        this.innerChannel.close(reactorDispatcher, closeCallback);
    }
}

