Способ протолкнуть буферизованные события через равные промежутки времени
Чего я пытаюсь достичь, так это буферизировать поступающие события от некоторых IObservable (они приходят в виде пакетов) и выпускать их дальше, но по одному, через равные промежутки времени.
Вот так:
-oo-ooo-oo------------------oooo-oo-o-------------->
-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->
Поскольку я довольно новичок в Rx , я не уверен, есть ли уже субъект или оператор, который делает именно это. Может быть, это можно сделать композицией?
Обновление:
Спасибо
Richard Szalay для указания оператора Drain я нашел другой пример использования оператора слива Джеймсом Майлзом . Вот как мне удалось заставить его работать в приложении WPF:
.Drain(x => {
Process(x);
return Observable.Return(new Unit())
.Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
}).Subscribe();
Я немного повеселился, потому что пропуск параметра scheduler приводит к сбою приложения в режиме отладки без каких-либо исключений ( мне нужно научиться справляться с исключениями в Rx).
Метод Process напрямую изменяет состояние пользовательского интерфейса, но я думаю, что сделать IObservable из него довольно просто (используя ISubject?).
Обновление:
В тем временем я экспериментировал с ISubject, класс ниже делает то, что я хотел - он своевременно выпускает буферизованные Ts:
public class StepSubject<T> : ISubject<T>
{
IObserver<T> subscriber;
Queue<T> queue = new Queue<T>();
MutableDisposable cancel = new MutableDisposable();
TimeSpan interval;
IScheduler scheduler;
bool idle = true;
public StepSubject(TimeSpan interval, IScheduler scheduler)
{
this.interval = interval;
this.scheduler = scheduler;
}
void Step()
{
T next;
lock (queue)
{
idle = queue.Count == 0;
if (!idle)
next = queue.Dequeue();
}
if (!idle)
{
cancel.Disposable = scheduler.Schedule(Step, interval);
subscriber.OnNext(next);
}
}
public void OnNext(T value)
{
lock (queue)
queue.Enqueue(value);
if (idle)
cancel.Disposable = scheduler.Schedule(Step);
}
public IDisposable Subscribe(IObserver<T> observer)
{
subscriber = observer;
return cancel;
}
}
Эта наивная реализация лишена OnCompleted и OnError для ясности, также разрешена только одна подписка.
3 ответов:
На самом деле все гораздо сложнее, чем кажется.
Использование
Delayне работает, потому что значения по-прежнему будут происходить массово, только с небольшой задержкой.Использование
Intervalлибо сCombineLatest, либо сZipне работает, так как первый вызовет пропуск исходных значений, а второй-буферизацию интервальных значений.Я думаю, что новый оператор
Drain(, добавленный в 1.0.2787.0 ), в сочетании сDelayдолжен сделать трюк:source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x));Оператор
Drainработает какSelectMany, но ожидает завершения предыдущего вывода, прежде чем вызвать селектор со следующим значением.это все еще не точно то, что вы ищете (первое значение в блоке также будет отложено), но это близко:использование выше соответствует вашей мраморной диаграмме сейчас.Edit: очевидно, что
Drainв рамках не работает какSelectMany. Я попрошу совета на официальных форумах. В то же время, вот реализация дренажа, который делает то, что вы после:Edit 09/11: исправлены ошибки в реализации и обновлено использование, чтобы соответствовать запрошенной Вами мраморной диаграмме.
public static class ObservableDrainExtensions { public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, Func<TSource, IObservable<TOut>> selector) { return Observable.Defer(() => { BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); return source .Zip(queue, (v, q) => v) .SelectMany(v => selector(v) .Do(_ => { }, () => queue.OnNext(new Unit())) ); }); } }
Просто для полноты здесь приводится альтернативная (более компактная) версия метода Drain (), предложенная Ричардом:
public static IObservable<T2> SelectManySequential<T1, T2>( this IObservable<T1> source, Func<T1, IObservable<T2>> selector ) { return source .Select(x => Observable.Defer<T2>(() => selector(x))) .Concat(); }Смотрите нить Drain + SelectMany = ? в форуме Rx.
Обновление: Я понял, что перегрузка Concat (), которую я использовал, была одним из моих личных расширений Rx, которые (еще не) являются частью фреймворка. Я сожалею об этой ошибке .. Конечно, это делает мое решение менее элегантным, чем я думал.
Тем не менее для полноты I пост вот мой контакт() метод расширения перегрузки:
public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source) { return Observable.CreateWithDisposable<T>(o => { var lockCookie = new Object(); bool completed = false; bool subscribed = false; var waiting = new Queue<IObservable<T>>(); var pendingSubscription = new MutableDisposable(); Action<Exception> errorHandler = e => { o.OnError(e); pendingSubscription.Dispose(); }; Func<IObservable<T>, IDisposable> subscribe = null; subscribe = (ob) => { subscribed = true; return ob.Subscribe( o.OnNext, errorHandler, () => { lock (lockCookie) { if (waiting.Count > 0) pendingSubscription.Disposable = subscribe(waiting.Dequeue()); else if (completed) o.OnCompleted(); else subscribed = false; } } ); }; return new CompositeDisposable(pendingSubscription, source.Subscribe( n => { lock (lockCookie) { if (!subscribed) pendingSubscription.Disposable = subscribe(n); else waiting.Enqueue(n); } }, errorHandler , () => { lock (lockCookie) { completed = true; if (!subscribed) o.OnCompleted(); } } ) ); }); }А теперь избиваю себя собственным оружием.: Тот же метод Concat() можно было бы написать гораздо более элегантно в блестящем стиле Ричарда Салея:
Таким образом, заслуга принадлежит Ричарду. :- )public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source) { return Observable.Defer(() => { BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); return source .Zip(queue, (v, q) => v) .SelectMany(v => v.Do(_ => { }, () => queue.OnNext(new Unit())) ); }); }
Вот как я это сделал, просто используя явную очередь (ReactiveCollection - это просто причудливая версия ObservableCollection WPF-ReactiveCollection.ItemsAdded OnNext для каждого добавленного элемента, как вы можете себе представить):
Https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ReactiveCollection.cs#L309
public static ReactiveCollection<T> CreateCollection<T>(this IObservable<T> FromObservable, TimeSpan? WithDelay = null) { var ret = new ReactiveCollection<T>(); if (WithDelay == null) { FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add); return ret; } // On a timer, dequeue items from queue if they are available var queue = new Queue<T>(); var disconnect = Observable.Timer(WithDelay.Value, WithDelay.Value) .ObserveOn(RxApp.DeferredScheduler).Subscribe(_ => { if (queue.Count > 0) { ret.Add(queue.Dequeue()); } }); // When new items come in from the observable, stuff them in the queue. // Using the DeferredScheduler guarantees we'll always access the queue // from the same thread. FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(queue.Enqueue); // This is a bit clever - keep a running count of the items actually // added and compare them to the final count of items provided by the // Observable. Combine the two values, and when they're equal, // disconnect the timer ret.ItemsAdded.Scan0(0, ((acc, _) => acc+1)).Zip(FromObservable.Aggregate(0, (acc,_) => acc+1), (l,r) => (l == r)).Where(x => x != false).Subscribe(_ => disconnect.Dispose()); return ret; }
Comments