package whatap.bci.reactor;

import java.util.function.BiFunction;
import java.util.function.Consumer;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.SignalType;
import reactor.util.context.Context;
import whatap.agent.Configure;
import whatap.agent.Logger;
import whatap.agent.api.trace.TxTrace;
import whatap.agent.trace.TraceContext;
import whatap.agent.trace.TraceContextManager;

/* loaded from: input_file:weaving/bci-reactor-1.0.jar:whatap/bci/reactor/HookReactive.class */
public class HookReactive {
    static Configure configure = Configure.getInstance();
    private static boolean isHook = false;

    public static <T> Mono<T> attachTxid(Mono<T> mono, long j) {
        return mono.contextWrite(Context.of(TraceContext.WTP_TXID, Long.valueOf(j)));
    }

    public static <T> Flux<T> attachTxid(Flux<T> flux, long j) {
        return flux.contextWrite(Context.of(TraceContext.WTP_TXID, Long.valueOf(j)));
    }

    public static <T> Mono<T> traceSubscriber(Mono<T> mono, final TraceContext traceContext) {
        if (traceContext == null) {
            return mono;
        }
        try {
            return mono.contextWrite(Context.of(TraceContext.WTP_TXID, Long.valueOf(traceContext.txid))).doOnError(new Consumer<Throwable>() { // from class: whatap.bci.reactor.HookReactive.1
                @Override // java.util.function.Consumer
                public void accept(Throwable th) {
                    TraceContext.this.handleMethodErrorStack(th);
                }
            }).doFinally(new Consumer<SignalType>() { // from class: whatap.bci.reactor.HookReactive.2
                @Override // java.util.function.Consumer
                public void accept(SignalType signalType) {
                    TxTrace.endTx(TraceContext.this, null);
                }
            }).doAfterTerminate(new Runnable() { // from class: whatap.bci.reactor.HookReactive.3
                @Override // java.lang.Runnable
                public void run() {
                }
            });
        } catch (Throwable th) {
            Logger.println(th.getMessage(), th);
            return mono;
        }
    }

    public static <T> Flux<T> traceSubscriber(Flux<T> flux, final TraceContext traceContext) {
        if (traceContext == null) {
            return flux;
        }
        try {
            return flux.contextWrite(Context.of(TraceContext.WTP_TXID, Long.valueOf(traceContext.txid))).doOnError(new Consumer<Throwable>() { // from class: whatap.bci.reactor.HookReactive.4
                @Override // java.util.function.Consumer
                public void accept(Throwable th) {
                    TraceContext.this.handleMethodErrorStack(th);
                }
            }).doFinally(new Consumer<SignalType>() { // from class: whatap.bci.reactor.HookReactive.5
                @Override // java.util.function.Consumer
                public void accept(SignalType signalType) {
                    TxTrace.endTx(TraceContext.this, null);
                }
            }).doAfterTerminate(new Runnable() { // from class: whatap.bci.reactor.HookReactive.6
                @Override // java.lang.Runnable
                public void run() {
                }
            });
        } catch (Throwable th) {
            Logger.println(th.getMessage(), th);
            return flux;
        }
    }

    public static void hookOperator() {
        if (isHook) {
            return;
        }
        isHook = true;
        try {
            Hooks.onEachOperator("bci-reactor", Operators.lift(new BiFunction<Scannable, CoreSubscriber<? super Object>, CoreSubscriber<? super Object>>() { // from class: whatap.bci.reactor.HookReactive.7
                @Override // java.util.function.BiFunction
                public CoreSubscriber<? super Object> apply(Scannable scannable, CoreSubscriber<? super Object> coreSubscriber) {
                    TraceContext context;
                    try {
                        if (scannable instanceof Fuseable.ScalarCallable) {
                            return coreSubscriber;
                        }
                        Long l = (Long) coreSubscriber.currentContext().getOrDefault(TraceContext.WTP_TXID, (Object) null);
                        if (l == null || (context = TraceContextManager.getContext(l.longValue())) == null) {
                            return coreSubscriber;
                        }
                        context.thread = Thread.currentThread();
                        return new ContextLifter(coreSubscriber, scannable, null, context);
                    } catch (Exception e) {
                        Logger.println(e.getMessage(), e);
                        return coreSubscriber;
                    }
                }
            }));
        } catch (Throwable th) {
            Logger.println(th.getMessage(), th);
        }
    }
}
