Android Architect path 21 Responsive Programming RX Java thread transformation Principles

Bruine d'hiver 2021-09-15 05:51:33
android architect path responsive programming


Préface

SchedulerIncarnant l'idée d'une programmation réactive:AdoptionSchedulerChangement réalisé,Et peut se propager vers le bas.(Diffusion du changement)

Module de fonction de transformation de fil:
  1. Permettre l'exécution du Code dans différents Threads
  2. subscribeOn-Fil au moment de l'abonnement
  3. observeOn- Thread at receive
  4. Scheduler - Faire une transformation de fil
1.RxJava1 Transformation du fil
  1. SchedulerDispatcher
  2. OperatorInterface opérateur - opérateur
  3. liftOpérateur Central
Exemple

 Observable.
create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (!subscriber.isUnsubscribed()) {
Log.d(TAG, "currentThread:" + Thread.currentThread());
subscriber.onNext("test");
subscriber.onCompleted();
}
}
}).
subscribeOn(Schedulers.newThread()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext:" + s + "currentThread:" + Thread.currentThread());
}
});
Copier le Code
Exécution

06-12 17:00:13.846 6227-6495/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[RxNewThreadScheduler-1,5,main]
06-12 17:00:13.856 6227-6227/com.haocai.rxjavademo D/kpioneer: onNext:testcurrentThread:Thread[main,5,main]
Copier le Code
2.RxJava2 Transformation du fil
  1. SchedulerDispatcher
  2. AbstractObservableWithUpStreamClasse abstraite

 /*---------Pas de contre - pression---------*/
Observable.
create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
if (!emitter.isDisposed()) {
Log.d(TAG, "Observable currentThread:" + Thread.currentThread());
emitter.onNext("test");
emitter.onComplete();
}
}
}).
subscribeOn(Schedulers.newThread()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String o) {
Log.d(TAG, "Observable onNext:" + o);
Log.d(TAG, "Observable currentThread:" + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
/*---------Il y a une contre - pression.---------*/
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
if (!emitter.isCancelled()) {
Log.d(TAG, "Flowable currentThread:" + Thread.currentThread());
emitter.onNext("test");
emitter.onComplete();
}
}
}, BackpressureStrategy.DROP).
subscribeOn(Schedulers.newThread()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String s) {
Log.d(TAG, "Flowable onNext:" + s);
Log.d(TAG, "Flowable currentThread:" + Thread.currentThread());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
Copier le Code

06-13 13:37:13.009 3063-3949/com.haocai.rxjavademo D/kpioneer: Observable currentThread:Thread[RxNewThreadScheduler-1,5,main]
06-13 13:37:13.019 3063-3063/com.haocai.rxjavademo D/kpioneer: Observable onNext:test
06-13 13:37:13.019 3063-3063/com.haocai.rxjavademo D/kpioneer: Observable currentThread:Thread[main,5,main]
06-13 13:37:13.019 3063-3950/com.haocai.rxjavademo D/kpioneer: Flowable currentThread:Thread[RxNewThreadScheduler-2,5,main]
06-13 13:37:13.029 3063-3063/com.haocai.rxjavademo D/kpioneer: Flowable onNext:test
06-13 13:37:13.029 3063-3063/com.haocai.rxjavademo D/kpioneer: Flowable currentThread:Thread[main,5,main]
Copier le Code
3.RxJava1 Scheduler Analyse du code source de l'expéditeur
  1. Scheduler: Classe abstraite
  2. Worker: La classe qui fait vraiment la programmation de thread
  3. Action0: Opérations effectuées dans le thread
  4. schedule: Comment programmer un thread ,Le paramètre d'entrée estAction0

public abstract class Scheduler {
public abstract Worker createWorker();
/** * Sequential Scheduler for executing actions on a single thread or event loop. * <p> * Unsubscribing the {@link Worker} cancels all outstanding work and allows resources cleanup. */
public abstract static class Worker implements Subscription {
/** * Schedules an Action for execution. * * @param action * Action to schedule * @return a subscription to be able to prevent or cancel the execution of the action */
public abstract Subscription schedule(Action0 action);
public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
return SchedulePeriodicHelper.schedulePeriodically(this, action,
initialDelay, period, unit, null);
}
public long now() {
return System.currentTimeMillis();
}
}
public long now() {
return System.currentTimeMillis();
}
@SuppressWarnings("unchecked")
public <S extends Scheduler & Subscription> S when(Func1<Observable<Observable<Completable>>, Completable> combine) {
return (S) new SchedulerWhen(combine, this);
}
}
Copier le Code
Processus d'ordonnancement des fils :
  1. Les entrées sont différentes Scheduler Pour utiliser différents fils

 public final Observable<T> subscribeOn(Scheduler scheduler) {
return subscribeOn(scheduler, !(this.onSubscribe instanceof OnSubscribeCreate));
}
Copier le Code

 public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
Copier le Code
  1. AvecSchedulerCréationWorker Pour utiliser un vrai pool de Threads

NewThreadWorker Créer un pool de Threads


public class NewThreadWorker extends Scheduler.Worker implements Subscription {
private final ScheduledExecutorService executor;
volatile boolean isUnsubscribed;
/** The purge frequency in milliseconds. */
private static final String FREQUENCY_KEY = "rx.scheduler.jdk6.purge-frequency-millis";
/** Force the use of purge (true/false). */
private static final String PURGE_FORCE_KEY = "rx.scheduler.jdk6.purge-force";
private static final String PURGE_THREAD_PREFIX = "RxSchedulerPurge-";
private static final boolean SHOULD_TRY_ENABLE_CANCEL_POLICY;
/** The purge frequency in milliseconds. */
public static final int PURGE_FREQUENCY;
private static final ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor> EXECUTORS;
private static final AtomicReference<ScheduledExecutorService> PURGE;
/** * Improves performance of {@link #tryEnableCancelPolicy(ScheduledExecutorService)}. * Also, it works even for inheritance: {@link Method} of base class can be invoked on the instance of child class. */
private static volatile Object cachedSetRemoveOnCancelPolicyMethod;
/** * Possible value of {@link #cachedSetRemoveOnCancelPolicyMethod} which means that cancel policy is not supported. */
private static final Object SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED = new Object();
static {
EXECUTORS = new ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor>();
PURGE = new AtomicReference<ScheduledExecutorService>();
PURGE_FREQUENCY = Integer.getInteger(FREQUENCY_KEY, 1000);
// Forces the use of purge even if setRemoveOnCancelPolicy is available
final boolean purgeForce = Boolean.getBoolean(PURGE_FORCE_KEY);
final int androidApiVersion = PlatformDependent.getAndroidApiVersion();
// According to http://developer.android.com/reference/java/util/concurrent/ScheduledThreadPoolExecutor.html#setRemoveOnCancelPolicy(boolean)
// setRemoveOnCancelPolicy available since Android API 21
SHOULD_TRY_ENABLE_CANCEL_POLICY = !purgeForce
&& (androidApiVersion == ANDROID_API_VERSION_IS_NOT_ANDROID || androidApiVersion >= 21);
}
/** * Registers the given executor service and starts the purge thread if not already started. * <p>{@code public} visibility reason: called from other package(s) within RxJava * @param service a scheduled thread pool executor instance */
public static void registerExecutor(ScheduledThreadPoolExecutor service) {
do {
ScheduledExecutorService exec = PURGE.get();
if (exec != null) {
break;
}
exec = Executors.newScheduledThreadPool(1, new RxThreadFactory(PURGE_THREAD_PREFIX));
if (PURGE.compareAndSet(null, exec)) {
exec.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
purgeExecutors();
}
}, PURGE_FREQUENCY, PURGE_FREQUENCY, TimeUnit.MILLISECONDS);
break;
} else {
exec.shutdownNow();
}
} while (true);
EXECUTORS.putIfAbsent(service, service);
}
/** * Deregisters the executor service. * <p>{@code public} visibility reason: called from other package(s) within RxJava * @param service a scheduled thread pool executor instance */
public static void deregisterExecutor(ScheduledExecutorService service) {
EXECUTORS.remove(service);
}
/** Purges each registered executor and eagerly evicts shutdown executors. */
static void purgeExecutors() {
try {
// This prevents map.keySet to compile to a Java 8+ KeySetView return type
// and cause NoSuchMethodError on Java 6-7 runtimes.
Map<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor> map = EXECUTORS;
Iterator<ScheduledThreadPoolExecutor> it = map.keySet().iterator();
while (it.hasNext()) {
ScheduledThreadPoolExecutor exec = it.next();
if (!exec.isShutdown()) {
exec.purge();
} else {
it.remove();
}
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
RxJavaHooks.onError(t);
}
}
/** * Tries to enable the Java 7+ setRemoveOnCancelPolicy. * <p>{@code public} visibility reason: called from other package(s) within RxJava. * If the method returns false, the {@link #registerExecutor(ScheduledThreadPoolExecutor)} may * be called to enable the backup option of purging the executors. * @param executor the executor to call setRemoveOnCancelPolicy if available. * @return true if the policy was successfully enabled */
public static boolean tryEnableCancelPolicy(ScheduledExecutorService executor) {
if (SHOULD_TRY_ENABLE_CANCEL_POLICY) { // NOPMD
final boolean isInstanceOfScheduledThreadPoolExecutor = executor instanceof ScheduledThreadPoolExecutor;
Method methodToCall;
if (isInstanceOfScheduledThreadPoolExecutor) {
final Object localSetRemoveOnCancelPolicyMethod = cachedSetRemoveOnCancelPolicyMethod;
if (localSetRemoveOnCancelPolicyMethod == SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED) {
return false;
}
if (localSetRemoveOnCancelPolicyMethod == null) {
Method method = findSetRemoveOnCancelPolicyMethod(executor);
cachedSetRemoveOnCancelPolicyMethod = method != null
? method
: SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED;
methodToCall = method;
} else {
methodToCall = (Method) localSetRemoveOnCancelPolicyMethod;
}
} else {
methodToCall = findSetRemoveOnCancelPolicyMethod(executor);
}
if (methodToCall != null) {
try {
methodToCall.invoke(executor, true);
return true;
} catch (InvocationTargetException e) {
RxJavaHooks.onError(e);
} catch (IllegalAccessException e) {
RxJavaHooks.onError(e);
} catch (IllegalArgumentException e) {
RxJavaHooks.onError(e);
}
}
}
return false;
}
/** * Tries to find {@code "setRemoveOnCancelPolicy(boolean)"} method in the class of passed executor. * * @param executor whose class will be used to search for required method. * @return {@code "setRemoveOnCancelPolicy(boolean)"} {@link Method} * or {@code null} if required {@link Method} was not found. */
static Method findSetRemoveOnCancelPolicyMethod(ScheduledExecutorService executor) {
// The reason for the loop is to avoid NoSuchMethodException being thrown on JDK 6
// which is more costly than looping through ~70 methods.
for (final Method method : executor.getClass().getMethods()) {
if (method.getName().equals("setRemoveOnCancelPolicy")) {
final Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1 && parameterTypes[0] == Boolean.TYPE) {
return method;
}
}
}
return null;
}
/* package */
public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
executor = exec;
}
@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, null);
}
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (isUnsubscribed) {
return Subscriptions.unsubscribed();
}
return scheduleActual(action, delayTime, unit);
}
/** * Schedules the given action by wrapping it into a ScheduledAction on the * underlying ExecutorService, returning the ScheduledAction. * @param action the action to wrap and schedule * @param delayTime the delay in execution * @param unit the time unit of the delay * @return the wrapper ScheduledAction */
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
parent.add(run);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
parent.add(run);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
@Override
public void unsubscribe() {
isUnsubscribed = true;
executor.shutdownNow();
deregisterExecutor(executor);
}
@Override
public boolean isUnsubscribed() {
return isUnsubscribed;
}
}
Copier le Code
  1. Actions spécifiques entrantes Action0
  2. Adoptionscheduler Méthodes pour réaliser l'ordonnancement

ScheduledAction Moyenne action.call(); Effectuer des opérations spécifiques


public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription {
/** */
private static final long serialVersionUID = -3962399486978279857L;
final SubscriptionList cancel;
final Action0 action;
public ScheduledAction(Action0 action) {
this.action = action;
this.cancel = new SubscriptionList();
}
public ScheduledAction(Action0 action, CompositeSubscription parent) {
this.action = action;
this.cancel = new SubscriptionList(new Remover(this, parent));
}
public ScheduledAction(Action0 action, SubscriptionList parent) {
this.action = action;
this.cancel = new SubscriptionList(new Remover2(this, parent));
}
@Override
public void run() {
try {
lazySet(Thread.currentThread());
action.call();
} catch (OnErrorNotImplementedException e) {
signalError(new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e));
} catch (Throwable e) {
signalError(new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e));
} finally {
unsubscribe();
}
}
void signalError(Throwable ie) {
RxJavaHooks.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
@Override
public boolean isUnsubscribed() {
return cancel.isUnsubscribed();
}
@Override
public void unsubscribe() {
if (!cancel.isUnsubscribed()) {
cancel.unsubscribe();
}
}
/** * Adds a general Subscription to this {@code ScheduledAction} that will be unsubscribed * if the underlying {@code action} completes or the this scheduled action is cancelled. * * @param s the Subscription to add */
public void add(Subscription s) {
cancel.add(s);
}
/** * Adds the given Future to the unsubscription composite in order to support * cancelling the underlying task in the executor framework. * @param f the future to add */
public void add(final Future<?> f) {
cancel.add(new FutureCompleter(f));
}
/** * Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is * cancelled or terminates, it can remove itself from this parent. * * @param parent * the parent {@code CompositeSubscription} to add */
public void addParent(CompositeSubscription parent) {
cancel.add(new Remover(this, parent));
}
/** * Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is * cancelled or terminates, it can remove itself from this parent. * * @param parent * the parent {@code CompositeSubscription} to add */
public void addParent(SubscriptionList parent) {
cancel.add(new Remover2(this, parent));
}
/** * Cancels the captured future if the caller of the call method * is not the same as the runner of the outer ScheduledAction to * prevent unnecessary self-interrupting if the unsubscription * happens from the same thread. */
final class FutureCompleter implements Subscription {
private final Future<?> f;
FutureCompleter(Future<?> f) {
this.f = f;
}
@Override
public void unsubscribe() {
if (ScheduledAction.this.get() != Thread.currentThread()) {
f.cancel(true);
} else {
f.cancel(false);
}
}
@Override
public boolean isUnsubscribed() {
return f.isCancelled();
}
}
/** Remove a child subscription from a composite when unsubscribing. */
static final class Remover extends AtomicBoolean implements Subscription {
/** */
private static final long serialVersionUID = 247232374289553518L;
final ScheduledAction s;
final CompositeSubscription parent;
public Remover(ScheduledAction s, CompositeSubscription parent) {
this.s = s;
this.parent = parent;
}
@Override
public boolean isUnsubscribed() {
return s.isUnsubscribed();
}
@Override
public void unsubscribe() {
if (compareAndSet(false, true)) {
parent.remove(s);
}
}
}
/** Remove a child subscription from a composite when unsubscribing. */
static final class Remover2 extends AtomicBoolean implements Subscription {
/** */
private static final long serialVersionUID = 247232374289553518L;
final ScheduledAction s;
final SubscriptionList parent;
public Remover2(ScheduledAction s, SubscriptionList parent) {
this.s = s;
this.parent = parent;
}
@Override
public boolean isUnsubscribed() {
return s.isUnsubscribed();
}
@Override
public void unsubscribe() {
if (compareAndSet(false, true)) {
parent.remove(s);
}
}
}
}
Copier le Code
rxJava1: rxandroidDansScheduler

AdoptionHandlerEtLooper Pour implémenter l'exécution dans le fil principal


/** Android-specific Schedulers. */
public final class AndroidSchedulers {
private static final AtomicReference<AndroidSchedulers> INSTANCE = new AtomicReference<>();
private final Scheduler mainThreadScheduler;
private static AndroidSchedulers getInstance() {
for (;;) {
AndroidSchedulers current = INSTANCE.get();
if (current != null) {
return current;
}
current = new AndroidSchedulers();
if (INSTANCE.compareAndSet(null, current)) {
return current;
}
}
}
private AndroidSchedulers() {
RxAndroidSchedulersHook hook = RxAndroidPlugins.getInstance().getSchedulersHook();
Scheduler main = hook.getMainThreadScheduler();
if (main != null) {
mainThreadScheduler = main;
} else {
mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());
}
}
/** A {@link Scheduler} which executes actions on the Android UI thread. */
public static Scheduler mainThread() {
return getInstance().mainThreadScheduler;
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new LooperScheduler(looper);
}
/** * Resets the current {@link AndroidSchedulers} instance. * This will re-init the cached schedulers on the next usage, * which can be useful in testing. */
@Experimental
public static void reset() {
INSTANCE.set(null);
}
}
Copier le Code

**

class LooperScheduler extends Scheduler {
private final Handler handler;
LooperScheduler(Looper looper) {
handler = new Handler(looper);
}
LooperScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
static class HandlerWorker extends Worker {
private final Handler handler;
private final RxAndroidSchedulersHook hook;
private volatile boolean unsubscribed;
HandlerWorker(Handler handler) {
this.handler = handler;
this.hook = RxAndroidPlugins.getInstance().getSchedulersHook();
}
@Override
public void unsubscribe() {
unsubscribed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isUnsubscribed() {
return unsubscribed;
}
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}
action = hook.onSchedule(action);
ScheduledAction scheduledAction = new ScheduledAction(action, handler);
Message message = Message.obtain(handler, scheduledAction);
message.obj = this; // Used as token for unsubscription operation.
handler.sendMessageDelayed(message, unit.toMillis(delayTime));
if (unsubscribed) {
handler.removeCallbacks(scheduledAction);
return Subscriptions.unsubscribed();
}
return scheduledAction;
}
@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, TimeUnit.MILLISECONDS);
}
}
static final class ScheduledAction implements Runnable, Subscription {
private final Action0 action;
private final Handler handler;
private volatile boolean unsubscribed;
ScheduledAction(Action0 action, Handler handler) {
this.action = action;
this.handler = handler;
}
@Override public void run() {
try {
action.call();
} catch (Throwable e) {
// nothing to do but print a System error as this is fatal and there is nowhere else to throw this
IllegalStateException ie;
if (e instanceof OnErrorNotImplementedException) {
ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e);
} else {
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
}
RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
@Override public void unsubscribe() {
unsubscribed = true;
handler.removeCallbacks(this);
}
@Override public boolean isUnsubscribed() {
return unsubscribed;
}
}
}
Copier le Code

4.RxJava2 Scheduler Analyse du code source de l'expéditeur


public abstract class Scheduler {
static final long CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
static {
CLOCK_DRIFT_TOLERANCE_NANOSECONDS = TimeUnit.MINUTES.toNanos(
Long.getLong("rx2.scheduler.drift-tolerance", 15));
}
public static long clockDriftTolerance() {
return CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
}
@NonNull
public abstract io.reactivex.Scheduler.Worker createWorker();
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
public void start() {
}
public void shutdown() {
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final io.reactivex.Scheduler.Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
io.reactivex.Scheduler.DisposeTask task = new io.reactivex.Scheduler.DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
@NonNull
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
final io.reactivex.Scheduler.Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
io.reactivex.Scheduler.PeriodicDirectTask periodicTask = new io.reactivex.Scheduler.PeriodicDirectTask(decoratedRun, w);
Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
return periodicTask;
}
@SuppressWarnings("unchecked")
@NonNull
public <S extends io.reactivex.Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) {
return (S) new SchedulerWhen(combine, this);
}
public abstract static class Worker implements Disposable {
/** * Schedules a Runnable for execution without any time delay. * * <p>The default implementation delegates to {@link #schedule(Runnable, long, TimeUnit)}. * * @param run * Runnable to schedule * @return a Disposable to be able to unsubscribe the action (cancel it if not executed) */
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
/** * Schedules an Runnable for execution at some point in the future specified by a time delay * relative to the current time. * <p> * Note to implementors: non-positive {@code delayTime} should be regarded as non-delayed schedule, i.e., * as if the {@link #schedule(Runnable)} was called. * * @param run * the Runnable to schedule * @param delay * time to "wait" before executing the action; non-positive values indicate an non-delayed * schedule * @param unit * the time unit of {@code delayTime} * @return a Disposable to be able to unsubscribe the action (cancel it if not executed) */
@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
@NonNull
public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
final SequentialDisposable first = new SequentialDisposable();
final SequentialDisposable sd = new SequentialDisposable(first);
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
final long periodInNanoseconds = unit.toNanos(period);
final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);
Disposable d = schedule(new io.reactivex.Scheduler.Worker.PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
periodInNanoseconds), initialDelay, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
first.replace(d);
return sd;
}
/** * Returns the 'current time' of the Worker in the specified time unit. * @param unit the time unit * @return the 'current time' * @since 2.0 */
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/** * Holds state and logic to calculate when the next delayed invocation * of this task has to happen (accounting for clock drifts). */
final class PeriodicTask implements Runnable, SchedulerRunnableIntrospection {
@NonNull
final Runnable decoratedRun;
@NonNull
final SequentialDisposable sd;
final long periodInNanoseconds;
long count;
long lastNowNanoseconds;
long startInNanoseconds;
PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) {
this.decoratedRun = decoratedRun;
this.sd = sd;
this.periodInNanoseconds = periodInNanoseconds;
lastNowNanoseconds = firstNowNanoseconds;
startInNanoseconds = firstStartInNanoseconds;
}
@Override
public void run() {
decoratedRun.run();
if (!sd.isDisposed()) {
long nextTick;
long nowNanoseconds = now(TimeUnit.NANOSECONDS);
// If the clock moved in a direction quite a bit, rebase the repetition period
if (nowNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS < lastNowNanoseconds
|| nowNanoseconds >= lastNowNanoseconds + periodInNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS) {
nextTick = nowNanoseconds + periodInNanoseconds;
/* * Shift the start point back by the drift as if the whole thing * started count periods ago. */
startInNanoseconds = nextTick - (periodInNanoseconds * (++count));
} else {
nextTick = startInNanoseconds + (++count * periodInNanoseconds);
}
lastNowNanoseconds = nowNanoseconds;
long delay = nextTick - nowNanoseconds;
sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
}
}
@Override
public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}
}
static final class PeriodicDirectTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
@NonNull
final Runnable run;
@NonNull
final io.reactivex.Scheduler.Worker worker;
volatile boolean disposed;
PeriodicDirectTask(@NonNull Runnable run, @NonNull io.reactivex.Scheduler.Worker worker) {
this.run = run;
this.worker = worker;
}
@Override
public void run() {
if (!disposed) {
try {
run.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
worker.dispose();
throw ExceptionHelper.wrapOrThrow(ex);
}
}
}
@Override
public void dispose() {
disposed = true;
worker.dispose();
}
@Override
public boolean isDisposed() {
return disposed;
}
@Override
public Runnable getWrappedRunnable() {
return run;
}
}
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
@NonNull
final Runnable decoratedRun;
@NonNull
final io.reactivex.Scheduler.Worker w;
@Nullable
Thread runner;
DisposeTask(@NonNull Runnable decoratedRun, @NonNull io.reactivex.Scheduler.Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
@Override
public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}
}
Copier le Code
  1. Passer dans un autreScheduler Pour utiliser différents fils
  2. AvecSchedulerCréationWorker Pour utiliser un vrai pool de Threads

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable run) {
return schedule(run, 0, null);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
/** * Schedules the given runnable on the underlying executor directly and * returns its future wrapped into a Disposable. * @param run the Runnable to execute in a delayed fashion * @param delayTime the delay amount * @param unit the delay time unit * @return the ScheduledRunnable instance */
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
try {
Future<?> f;
if (delayTime <= 0L) {
f = executor.submit(task);
} else {
f = executor.schedule(task, delayTime, unit);
}
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}
/** * Schedules the given runnable periodically on the underlying executor directly * and returns its future wrapped into a Disposable. * @param run the Runnable to execute in a periodic fashion * @param initialDelay the initial delay amount * @param period the repeat period amount * @param unit the time unit for both the initialDelay and period * @return the ScheduledRunnable instance */
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
if (period <= 0L) {
InstantPeriodicTask periodicWrapper = new InstantPeriodicTask(decoratedRun, executor);
try {
Future<?> f;
if (initialDelay <= 0L) {
f = executor.submit(periodicWrapper);
} else {
f = executor.schedule(periodicWrapper, initialDelay, unit);
}
periodicWrapper.setFirst(f);
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
return periodicWrapper;
}
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
try {
Future<?> f = executor.scheduleAtFixedRate(task, initialDelay, period, unit);
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}
/** * Wraps the given runnable into a ScheduledRunnable and schedules it * on the underlying ScheduledExecutorService. * <p>If the schedule has been rejected, the ScheduledRunnable.wasScheduled will return * false. * @param run the runnable instance * @param delayTime the time to delay the execution * @param unit the time unit * @param parent the optional tracker parent to add the created ScheduledRunnable instance to before it gets scheduled * @return the ScheduledRunnable instance */
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
@Override
public void dispose() {
if (!disposed) {
disposed = true;
executor.shutdownNow();
}
}
/** * Shuts down the underlying executor in a non-interrupting fashion. */
public void shutdown() {
if (!disposed) {
disposed = true;
executor.shutdown();
}
}
@Override
public boolean isDisposed() {
return disposed;
}
}
Copier le Code
  1. Actions spécifiques entrantes Runnable
  2. Adoptionschedule Méthodes pour réaliser l'ordonnancement
RxJava2: rxandroidDansScheduler

AvecRxJava1 Adoption similaire HandlerEtLooper Pour implémenter l'exécution dans le fil principal


public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
Copier le Code

**

final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, unit.toMillis(delay));
return scheduled;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacks(this);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
}
Copier le Code

5.RxJava1 Scheduler Imitation de thread Transform

Objet principal :
Switcher Changeur de fil
  1. Pour le changement de fil
  2. Il y a uncreateWorkerMéthodes

/** * Created by Xionghu on 2018/6/14. * Desc: Pour le changement de fil */
public abstract class Switcher {
public abstract Worker createWorker();
public static abstract class Worker implements Calling{
public abstract Calling switches(Action0 action0);
}
}
Copier le Code
Worker
  1. La classe qui effectue réellement la transformation du fil
  2. Adoptionswitches La méthode effectue la transformation
NewThreadSwitcher
  1. Passer au nouveau thread Switcher
  2. RéalisationcreateWorkerMéthodes

/** * Created by Xionghu on 2018/6/14. * Desc: Nouveau thread switcher */
public class NewThreadSwitcher extends Switcher {
@Override
public Worker createWorker() {
return new NewThreadWorker();
}
}
Copier le Code
NewThreadWorker
  1. Il y a un pool de Threads avec un seul thread
  2. Pour implémenter le thread de commutation switchesMéthodes
  3. Pour vraiment fonctionner Runnable Paquet perdu dans le pool de Threads exécution

/** * Created by Xionghu on 2018/6/14. * Desc: Classe de travail pour le nouveau thread */
public class NewThreadWorker extends Switcher.Worker {
//newScheduledThreadPool :Créer un pool de Threads de taille illimitée.Ce pool de Threads prend en charge les exigences d'exécution des tâches programmées et cycliques.
private final ExecutorService mExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(@NonNull Runnable runnable) {
return new Thread(runnable, " NewThreadWorker");
}
});
private volatile boolean mIsUnCalled;
@Override
public void unCall() {
mIsUnCalled = true;
}
@Override
public boolean isUnCalled() {
return mIsUnCalled;
}
@Override
public Calling switches(Action0 action0) {
SwitcherAction switcherAction = new SwitcherAction(action0);
mExecutor.submit(switcherAction);
return switcherAction;
}
private static class SwitcherAction implements Runnable, Calling {
private final Action0 action0;
private volatile boolean mIsUnCalled;
public SwitcherAction(Action0 action0) {
this.action0 = action0;
}
@Override
public void unCall() {
mIsUnCalled = true;
}
@Override
public boolean isUnCalled() {
return mIsUnCalled;
}
@Override
public void run() {
action0.call();
}
}
}
Copier le Code
LooperSwitcher

 Android Pour passer à un thread LooperMoyenne
/** * Created by Xionghu on 2018/6/14. * Desc: PourAndroidMoyenneLooperDeSwitcher */
public class LooperSwitcher extends Switcher {
private Handler mHandler;
public LooperSwitcher(Looper looper) {
mHandler = new Handler(looper);
}
@Override
public Worker createWorker() {
return new HandlerWorker(mHandler);
}
}
Copier le Code
HandlerWorker

Envoyer l'action spécifique à la LooperExécution intermédiaire

**

import android.os.Handler;
import android.os.Message;
/**
* Created by Xionghu on 2018/6/14.
* Desc: PourAndroid DeWorker
*/
public class HandlerWorker extends Switcher.Worker {
private final Handler mHandler;
private volatile boolean mIsUnCalled;
public HandlerWorker(Handler mHandler) {
this.mHandler = mHandler;
}
@Override
public void unCall() {
mIsUnCalled = true;
mHandler.removeCallbacksAndMessages(this);
}
@Override
public boolean isUnCalled() {
return mIsUnCalled;
}
@Override
public Calling switches(Action0 action0) {
SwitcherAction switcherAction = new SwitcherAction(action0, mHandler);
Message message = Message.obtain(mHandler, switcherAction);
message.obj = this;
mHandler.sendMessage(message);
return switcherAction;
}
private static class SwitcherAction implements Runnable, Calling {
private final Action0 action0;
private final Handler handler;
private volatile boolean mIsUnCalled;
public SwitcherAction(Action0 action0, Handler handler) {
this.action0 = action0;
this.handler = handler;
}
@Override
public void unCall() {
mIsUnCalled = true;
handler.removeCallbacks(this);
}
@Override
public boolean isUnCalled() {
return mIsUnCalled;
}
@Override
public void run() {
action0.call();
}
}
}
Copier le Code

6.RxJava2 Scheduler Imitation de thread Transform

Switcher Changeur de fil
  1. Pour le changement de fil
  2. Il y a uncreateWorkerMéthodes
  3. Contient en soi un switchesMéthodes(AvecRxJava1Il y a une différence.)

/** * Created by Xionghu on 2018/6/14. * Desc: Classes abstraites pour la commutation de fil */
public abstract class Switcher {
public abstract Worker createWorker();
public Release switches(final Runnable runnable) {
Worker worker = createWorker();
worker.switches(new Runnable() {
@Override
public void run() {
runnable.run();
}
});
return worker;
}
public static abstract class Worker implements Release {
public abstract Release switches(Runnable runnable);
}
}
Copier le Code
Worker
  1. La classe qui effectue réellement la transformation du fil
  2. Adoptionswitches La méthode effectue la transformation
NewThreadSwitcher
  1. Passer au nouveau thread Switcher
  2. RéalisationcreateWorkerMéthodes

/** * Created by Xionghu on 2018/6/14. * Desc: Nouveau thread switcher */
public class NewThreadSwitcher extends Switcher {
@Override
public Worker createWorker() {
return new NewThreadWorker();
}
}
Copier le Code
NewThreadWorker
  1. Il y a un pool de Threads avec un seul thread
  2. Pour implémenter le thread de commutation switchesMéthodes
  3. Pour vraiment fonctionner Runnable Paquet perdu dans le pool de Threads exécution

import android.support.annotation.NonNull;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/** * Created by Xionghu on 2018/6/14. * Desc: Classe de travail pour le nouveau thread */
public class NewThreadWorker extends Switcher.Worker {
private final ExecutorService mExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(@NonNull Runnable runnable) {
return new Thread(runnable, "NewThreadWorker");
}
});
private volatile boolean mIsReleased;
@Override
public boolean isReleased() {
return mIsReleased;
}
@Override
public void release() {
mIsReleased = true;
}
@Override
public Release switches(Runnable runnable) {
SwitcherAction switcherAction = new SwitcherAction(runnable);
mExecutor.submit((Callable<Object>) switcherAction);
return switcherAction;
}
private static class SwitcherAction implements Runnable, Callable<Object>, Release {
private final Runnable mRunnable;
private volatile boolean mIsReleased;
public SwitcherAction(Runnable mRunnable) {
this.mRunnable = mRunnable;
}
@Override
public boolean isReleased() {
return mIsReleased;
}
@Override
public void release() {
mIsReleased = true;
}
@Override
public void run() {
mRunnable.run();
}
@Override
public Object call() throws Exception {
run();
return null;
}
}
}
Copier le Code
LooperSwitcher

Android Pour passer à un thread LooperMoyenne

HandlerWorker

Envoyer l'action spécifique à la LooperExécution intermédiaire

7.RxJava1 subscribeOn Analyse des principes

  1. AdoptionOnSubscribe Pour faire le principe
  2. UtilisationScheduler Placer l'action émise dans le fil pour l'exécution

 public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
}
Copier le Code

**

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
final boolean requestOn;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
this.scheduler = scheduler;
this.source = source;
this.requestOn = requestOn;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
subscriber.add(parent);
subscriber.add(inner);
inner.schedule(parent);
}
static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> actual;
final boolean requestOn;
final Worker worker;
Observable<T> source;
Thread t;
SubscribeOnSubscriber(Subscriber<? super T> actual, boolean requestOn, Worker worker, Observable<T> source) {
this.actual = actual;
this.requestOn = requestOn;
this.worker = worker;
this.source = source;
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
actual.onError(e);
} finally {
worker.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
actual.onCompleted();
} finally {
worker.unsubscribe();
}
}
@Override
public void call() {
Observable<T> src = source;
source = null;
t = Thread.currentThread();
src.unsafeSubscribe(this);
}
@Override
public void setProducer(final Producer p) {
actual.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread() || !requestOn) {
p.request(n);
} else {
worker.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
}
Copier le Code

Le mécanisme de l'Agence a été utilisé

8.RxJava2 subscribeOn Analyse des principes

8.1.RxJava2(Pas de contre - pression) subscribeOn

**

 @CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
Copier le Code

**

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
}
Copier le Code
  1. SuccessionAbstractObservableWithUpstream
  2. RéalisationsubscribeActualMéthodes
  3. UtilisationScheduler Mettre l'action d'envoi dans le thread pour l'exécution
8.2.RxJava2(Il y a une contre - pression.) subscribeOn

**

 @CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.CUSTOM)
@Experimental
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
}
Copier le Code

**

public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
final Scheduler scheduler;
final boolean nonScheduledRequests;
public FlowableSubscribeOn(Flowable<T> source, Scheduler scheduler, boolean nonScheduledRequests) {
super(source);
this.scheduler = scheduler;
this.nonScheduledRequests = nonScheduledRequests;
}
@Override
public void subscribeActual(final Subscriber<? super T> s) {
Scheduler.Worker w = scheduler.createWorker();
final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
s.onSubscribe(sos);
w.schedule(sos);
}
static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
implements FlowableSubscriber<T>, Subscription, Runnable {
private static final long serialVersionUID = 8094547886072529208L;
final Subscriber<? super T> actual;
final Scheduler.Worker worker;
final AtomicReference<Subscription> s;
final AtomicLong requested;
final boolean nonScheduledRequests;
Publisher<T> source;
SubscribeOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker, Publisher<T> source, boolean requestOn) {
this.actual = actual;
this.worker = worker;
this.source = source;
this.s = new AtomicReference<Subscription>();
this.requested = new AtomicLong();
this.nonScheduledRequests = !requestOn;
}
@Override
public void run() {
lazySet(Thread.currentThread());
Publisher<T> src = source;
source = null;
src.subscribe(this);
}
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.setOnce(this.s, s)) {
long r = requested.getAndSet(0L);
if (r != 0L) {
requestUpstream(r, s);
}
}
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
worker.dispose();
}
@Override
public void onComplete() {
actual.onComplete();
worker.dispose();
}
@Override
public void request(final long n) {
if (SubscriptionHelper.validate(n)) {
Subscription s = this.s.get();
if (s != null) {
requestUpstream(n, s);
} else {
BackpressureHelper.add(requested, n);
s = this.s.get();
if (s != null) {
long r = requested.getAndSet(0L);
if (r != 0L) {
requestUpstream(r, s);
}
}
}
}
}
void requestUpstream(final long n, final Subscription s) {
if (nonScheduledRequests || Thread.currentThread() == get()) {
s.request(n);
} else {
worker.schedule(new Request(s, n));
}
}
@Override
public void cancel() {
SubscriptionHelper.cancel(s);
worker.dispose();
}
static final class Request implements Runnable {
private final Subscription s;
private final long n;
Request(Subscription s, long n) {
this.s = s;
this.n = n;
}
@Override
public void run() {
s.request(n);
}
}
}
}
Copier le Code
  1. SuccessionAbstractFlowableWithUpstream
  2. RéalisationsubscribeActualMéthodes
  3. UtilisationScheduler Placer l'action émise dans le fil pour l'exécution

9. RxJava1 subscribeOnImitation

**

 public final Caller<T> callOn(Switcher switcher) {
return create(new OperatorCallOn<>(switcher, this));
}
Copier le Code

**

/**
* Created by Xionghu on 2018/6/14.
* Desc: PourcallOnDeOnCall
*/
public class OperatorCallOn<T> implements Caller.OnCall<T> {
private final Switcher switcher;
private final Caller<T> tCaller;
public OperatorCallOn(Switcher switcher, Caller<T> tCaller) {
this.switcher = switcher;
this.tCaller = tCaller;
}
@Override
public void call(final Receiver<T> tReceiver) {
Switcher.Worker worker = switcher.createWorker();
worker.switches(new Action0() {
@Override
public void call() {
Receiver<T> tReceiver1 = new Receiver<T>() {
@Override
public void onCompleted() {
tReceiver.onCompleted();
}
@Override
public void onError(Throwable t) {
tReceiver.onError(t);
}
@Override
public void onReceive(T t) {
tReceiver.onReceive(t);
}
};
tCaller.call(tReceiver1);
}
});
}
}
Copier le Code
PourcallOnDeOnCall
  1. Tenir l'originalCallerEtSwitcher
  2. Créer un nouveauReceiver Envelopper l'ancien fil perdu
Exécution

**

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import com.haocai.mylibrary.rxJava1.Caller;
import com.haocai.mylibrary.rxJava1.NewThreadSwitcher;
import com.haocai.mylibrary.rxJava1.Receiver;
import com.haocai.rxjavademo.R;
import butterknife.ButterKnife;
import butterknife.OnClick;
/**
* Created by Xionghu on 2018/6/11.
* Desc: .RxJava1 subscribeOnImitation
*/
public class Lesson3_2Activity extends AppCompatActivity {
public static final String TAG = "kpioneer";
@Override
protected void onCreate(final Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_custom_test);
ButterKnife.bind(this);
}
@OnClick(R.id.testDo)
public void onViewClicked() {
Caller.
create(new Caller.OnCall<String>() {
@Override
public void call(Receiver<String> stringReceiver) {
if (!stringReceiver.isUnCalled()) {
stringReceiver.onReceive("test");
stringReceiver.onCompleted();
}
}
}).
callOn(new NewThreadSwitcher()).
call(new Receiver<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onReceive(String o) {
Log.d(TAG, "onReceive:" + o);
Log.d(TAG, "currentThread:" + Thread.currentThread());
}
});
}
}
Copier le Code

**

06-15 14:19:02.219 17153-17366/com.haocai.rxjavademo D/kpioneer: onReceive:test
06-15 14:19:02.219 17153-17366/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[ NewThreadWorker,5,main]
Copier le Code

10. RxJava2 subscribeOnImitation

10.1RxJava2(Pas de contre - pression)

**

 public Caller<T> callOn(Switcher switcher) {
return new CallerCallOn<>(this, switcher);
}
Copier le Code

**

/**
* Created by Xionghu on 2018/6/15.
* Desc: PourcallOn
*/
public class CallerCallOn<T> extends CallerWithUpstream<T, T> {
private Switcher mSwitcher;
public CallerCallOn(Caller<T> source, Switcher mSwitcher) {
super(source);
this.mSwitcher = mSwitcher;
}
@Override
protected void callActual(Callee<T> callee) {
final CallOnCallee<T> tCallOnCallee = new CallOnCallee<>(callee);
callee.onCall(tCallOnCallee);
mSwitcher.switches(new Runnable() {
@Override
public void run() {
source.call(tCallOnCallee);
}
});
}
private static final class CallOnCallee<T> implements Callee<T>, Release {
private final Callee<T> callee;
public CallOnCallee(Callee<T> callee) {
this.callee = callee;
}
@Override
public void onCall(Release release) {
}
@Override
public void onReceive(T t) {
callee.onReceive(t);
}
@Override
public void onCompleted() {
callee.onCompleted();
}
@Override
public void onError(Throwable t) {
callee.onError(t);
}
@Override
public boolean isReleased() {
return false;
}
@Override
public void release() {
}
}
}
Copier le Code
CallerCallOn
  1. DeCallerWithUpstream
  2. Tenir l'originalCallerEtSwitcher
  3. Créer un nouveauCallee Envelopper l'ancien fil perdu
10.2RxJava2(Il y a une contre - pression.)

**

 public Telephoner<T> callOn(Switcher switcher) {
return new TelephonerCallOn<>(this, switcher);
}
Copier le Code

**

import com.haocai.mylibrary.rxJava2.Switcher;
import java.util.concurrent.atomic.AtomicLong;
/**
* Created by Xionghu on 2018/6/15.
* Desc:PourcallOn
*/
public class TelephonerCallOn<T> extends TelephonerWithUpstream<T, T> {
private final Switcher mSwitcher;
public TelephonerCallOn(Telephoner<T> source, Switcher switcher) {
super(source);
mSwitcher = switcher;
}
@Override
protected void callActual(Receiver<T> receiver) {
final CallOnReceiver<T> tCallOnReceiver = new CallOnReceiver<>(receiver);
receiver.onCall(tCallOnReceiver);
mSwitcher.switches(new Runnable() {
@Override
public void run() {
source.call(tCallOnReceiver);
}
});
}
private static final class CallOnReceiver<T> extends AtomicLong implements Receiver<T>, Drop {
private final Receiver<T> mReceiver;
public CallOnReceiver(Receiver<T> receiver) {
mReceiver = receiver;
}
@Override
public void request(long n) {
BackpressureHelper.add(this, n);
}
@Override
public void drop() {
}
@Override
public void onCall(Drop d) {
mReceiver.onCall(d);
}
@Override
public void onReceive(T t) {
if (get() != 0) {
mReceiver.onReceive(t);
BackpressureHelper.produced(this, 1);
}
}
@Override
public void onError(Throwable t) {
mReceiver.onError(t);
}
@Override
public void onCompleted() {
mReceiver.onCompleted();
}
}
}
Copier le Code
TelephonerCallOn
  1. DeTelephonerWithUpstream
  2. Tenir l'originalTelephonerEtSwitcher
  3. Créer un nouveauReceiver Envelopper l'ancien fil perdu
10.3 Exécution

**

/**
* Created by Xionghu on 2018/6/11.
* Desc: .RxJava2 subscribeOnImitation
*/
public class Lesson3_3Activity extends AppCompatActivity {
public static final String TAG = "kpioneer";
@Override
protected void onCreate(final Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_custom_test);
ButterKnife.bind(this);
}
@OnClick(R.id.testDo)
public void onViewClicked() {
/*---------Pas de contre - pression---------*/
Caller.
create(new CallerOnCall<String>() {
@Override
public void call(CallerEmitter<String> callerEmitter) {
callerEmitter.onReceive("test");
callerEmitter.onCompleted();
}
}).
callOn(new NewThreadSwitcher()).
call(new Callee<String>() {
@Override
public void onCall(Release release) {
}
@Override
public void onReceive(String string) {
Log.d(TAG, "Pas de contre - pression:onReceive:" + string);
Log.d(TAG, "Pas de contre - pression:currentThread:" + Thread.currentThread());
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable t) {
}
});
/*---------Il y a une contre - pression.---------*/
Telephoner.
create(new TelephonerOnCall<String>() {
@Override
public void call(TelephonerEmitter<String> telephonerEmitter) {
telephonerEmitter.onReceive("test");
telephonerEmitter.onCompleted();
}
}).
callOn(new NewThreadSwitcher()).
call(new Receiver<String>() {
@Override
public void onCall(Drop d) {
d.request(Long.MAX_VALUE);
}
@Override
public void onReceive(String s) {
Log.d(TAG, "Il y a une contre - pression.:onReceive:" + s);
Log.d(TAG, "Il y a une contre - pression.:currentThread:" + Thread.currentThread());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
});
}
}
Copier le Code
06-15 16:13:27.002 813-1150/com.haocai.rxjavademo D/kpioneer: Pas de contre - pression:onReceive:test
06-15 16:13:27.003 813-1150/com.haocai.rxjavademo D/kpioneer: Pas de contre - pression:currentThread:Thread[NewThreadWorker,5,main]
06-15 16:13:27.011 813-1151/com.haocai.rxjavademo D/kpioneer: Il y a une contre - pression.:onReceive:test
06-15 16:13:27.011 813-1151/com.haocai.rxjavademo D/kpioneer: Il y a une contre - pression.:currentThread:Thread[NewThreadWorker,5,main]
Copier le Code

11 RxJava1 observeOnAnalyse des principes

**

 public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
Copier le Code

**

public final class OperatorObserveOn<T> implements Operator<T, T> {
private final Scheduler scheduler;
private final boolean delayError;
private final int bufferSize;
/**
* @param scheduler the scheduler to use
* @param delayError delay errors until all normal events are emitted in the other thread?
*/
public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
this(scheduler, delayError, RxRingBuffer.SIZE);
}
/**
* @param scheduler the scheduler to use
* @param delayError delay errors until all normal events are emitted in the other thread?
* @param bufferSize for the buffer feeding the Scheduler workers, defaults to {@code RxRingBuffer.MAX} if <= 0
*/
public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
public static <T> Operator<T, T> rebatch(final int n) {
return new Operator<T, T>() {
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(Schedulers.immediate(), child, false, n);
parent.init();
return parent;
}
};
}
/** Observe through individual queue per observer. */
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
final boolean delayError;
final Queue<Object> queue;
/** The emission threshold that should trigger a replenishing request. */
final int limit;
// the status of the current stream
volatile boolean finished;
final AtomicLong requested = new AtomicLong();
final AtomicLong counter = new AtomicLong();
/**
* The single exception if not null, should be written before setting finished (release) and read after
* reading finished (acquire).
*/
Throwable error;
/** Remembers how many elements have been emitted before the requests run out. */
long emitted;
// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
this.delayError = delayError;
int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
// this formula calculates the 75% of the bufferSize, rounded up to the next integer
this.limit = calculatedSize - (calculatedSize >> 2);
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(calculatedSize);
} else {
queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
}
// signal that this is an async operator capable of receiving this many
request(calculatedSize);
}
void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
Subscriber<? super T> localChild = child;
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule();
}
}
});
localChild.add(recursiveScheduler);
localChild.add(this);
}
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(NotificationLite.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}
@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
RxJavaHooks.onError(e);
return;
}
error = e;
finished = true;
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
// only execute this from schedule()
@Override
public void call() {
long missed = 1L;
long currentEmission = emitted;
// these are accessed in a tight loop around atomics so
// loading them into local variables avoids the mandatory re-reading
// of the constant fields
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
// requested and counter are not included to avoid JIT issues with register spilling
// and their access is is amortized because they are part of the outer loop which runs
// less frequently (usually after each bufferSize elements)
for (;;) {
long requestAmount = requested.get();
while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;
if (checkTerminated(done, empty, localChild, q)) {
return;
}
if (empty) {
break;
}
localChild.onNext(NotificationLite.<T>getValue(v));
currentEmission++;
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = 0L;
}
}
if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}
emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}
}
boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
if (a.isUnsubscribed()) {
q.clear();
return true;
}
if (done) {
if (delayError) {
if (isEmpty) {
Throwable e = error;
try {
if (e != null) {
a.onError(e);
} else {
a.onCompleted();
}
} finally {
recursiveScheduler.unsubscribe();
}
}
} else {
Throwable e = error;
if (e != null) {
q.clear();
try {
a.onError(e);
} finally {
recursiveScheduler.unsubscribe();
}
return true;
} else
if (isEmpty) {
try {
a.onCompleted();
} finally {
recursiveScheduler.unsubscribe();
}
return true;
}
}
}
return false;
}
}
}
Copier le Code
PourobserveOnDeOperator
  1. - Oui.observeOnDeOperator
  2. Adoptionlift Pour changer ça Operator
  3. InOperator Renvoie un pour observeOnDeSubscriber
PourobserveOnDeSubscriber

Appelé àonNextJetez - le dans le thread pour l'exécuter en attendant la méthode

版权声明
本文为[Bruine d'hiver]所创,转载请带上原文链接,感谢
https://javamana.com/2021/09/20210915055130129e.html

  1. 互聯網Java工程師面試題,遇到的面試官都是架構師級別,
  2. 从入门到精通系列Java高级工程师路线介绍,拼多多三面惨败,
  3. 今年最新整理的《高频Java面试题集合》,2021Java通用流行框架大全,
  4. La dernière collection de questions d'entrevue Java haute fréquence organisée cette année, 2021 Java Universal Popular Framework
  5. De l'introduction à l'introduction de l'itinéraire de l'ingénieur principal Java de la série Mastering, il y a eu de nombreux échecs.
  6. JavaScript operator (1), Web Development Engineer
  7. Java simultané Programming Books recommended, half - Runner Java Program see me easy to Attack!
  8. Trier les questions d'entrevue Javascript, trier les points de connaissance des itinéraires d'apprentissage
  9. Xiaopeng P7, a high-value domestic electric car that can't be missed
  10. Song Mengjun's "sleepless night" triggered an upsurge of dance storm after 00
  11. Encapsulated PHP sends HTTP requests with curl. Get and post are very easy to use
  12. Recommend a lightweight and practical excellent Linux panel - wgcloud
  13. 从思维图到基础再到深入,记一次字节跳动Java研发岗的面试经历,
  14. 从底层开始带你了解并发编程,五步搞定Java开发环境部署,
  15. 从基础到源码统统帮你搞定,一招彻底帮你搞定HashMap源码,
  16. 從基礎到源碼統統幫你搞定,一招徹底幫你搞定HashMap源碼,
  17. De la base au code source pour vous aider à résoudre tout, un tour pour vous aider à résoudre complètement le code source hashtap,
  18. Commencez par le bas pour vous familiariser avec la programmation simultanée, et terminez le déploiement de l'environnement de développement Java en cinq étapes.
  19. De la carte de pensée à la base et à l'approfondissement, prenez note de l'expérience d'entrevue d'un octet sautant le poste de recherche et développement Java.
  20. Open source: Suzhou tourism strategy based on pyecharts visual analysis
  21. Good play | every character hates it. How does jade building spring do it?
  22. 從底層開始帶你了解並發編程,五步搞定Java開發環境部署,
  23. 以商品超卖为例讲解Redis分布式锁,一招彻底帮你搞定HashMap源码,
  24. 从青铜到王者的路线,2021Java者未来的出路在哪里?
  25. JavaScript Advanced Programming (3rd Edition) Reading note 6
  26. 從青銅到王者的路線,2021Java者未來的出路在哪裏?
  27. Quelle est la voie à suivre pour les 2021 Java du bronze au roi?
  28. Prenez l'exemple de la surproduction de marchandises pour expliquer redis Distributed Lock, un tour complet pour vous aider à résoudre le code source de hashtap,
  29. 以商品超賣為例講解Redis分布式鎖,一招徹底幫你搞定HashMap源碼,
  30. Win10系统 java环境配置
  31. Non-ASCII character ‘\xe5‘ in file kf1.py on line 4, but no encoding declared; see http://python.or
  32. 手把手教你搭建微信小程序服务器(HTTPS)
  33. JavaScript Review sketch - 1
  34. sqli-labs-less-18 http头user agent+报错注入
  35. Git下载、安装、配置、配合Intellij Idea实现代码版本控制
  36. NHibernate inheritance
  37. Summary of basic knowledge points of JavaScript language (mind map)
  38. GIT télécharge, installe, configure et implémente le contrôle de version de code avec intellij idea
  39. Sqli Labs - less - 18 http header user agent + Error Reporting Injection
  40. Vous apprendrez à construire un serveur d'applet Wechat (https) à la main
  41. Non - ASCII character 'xe5' in file kf1.py on Line 4, but no Encoding declared;Voirhttp://python.or
  42. 作为一名程序员我不忘初心,Java最新实习面试经验总结,
  43. 作为一名Java面试者你应该知道的,2021最新Java常用开源库总结,
  44. 作为一个程序员,你觉得最大的悲哀是什么,2021年大厂Java岗面试必问,
  45. Configuration de l'environnement Java du système win10
  46. 作為一個程序員,你覺得最大的悲哀是什麼,2021年大廠Java崗面試必問,
  47. En tant que programmeur, quelle est la plus grande tristesse que vous ressentez? L'entrevue d'emploi Java de 2021 dans une grande usine vous demandera:
  48. Comme vous devriez le savoir en tant qu'intervieweur Java, 2021 dernier résumé des bibliothèques open source couramment utilisées pour Java,
  49. En tant que programmeur, je n'oublie pas le dernier résumé de mon expérience d'entrevue de stage en Java.
  50. 作為一名Java面試者你應該知道的,2021最新Java常用開源庫總結,
  51. New feature of Java 8. Stream (). Map (general programming method: collect. Groupingby)
  52. Computer graduation project java + SSM hospital registration system
  53. 作為一名程序員我不忘初心,Java最新實習面試經驗總結,
  54. 使用Docker部署Spring-Boot项目,论程序员成长的正确姿势,
  55. Conseils pour améliorer l'efficacité du Code Java mille fois
  56. 全网首发,我在华为做Java外包的真实经历!
  57. 全套Java视频百度云,终于找到一个看得懂的JVM内存模型了,
  58. Docker tutorial series (I) introduction to docker tutorial spring cloud mybatis distributed microservice Cloud Architecture
  59. 全網首發,我在華為做Java外包的真實經曆!
  60. Run around with money? Li Weijia fell into the storm of endorsement! In the face of collective hot discussion, personal attitude has become the focus of attention