Способ протолкнуть буферизованные события через равные промежутки времени



Чего я пытаюсь достичь, так это буферизировать поступающие события от некоторых IObservable (они приходят в виде пакетов) и выпускать их дальше, но по одному, через равные промежутки времени.
Вот так:



-oo-ooo-oo------------------oooo-oo-o-------------->

-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->


Поскольку я довольно новичок в Rx , я не уверен, есть ли уже субъект или оператор, который делает именно это. Может быть, это можно сделать композицией?

Обновление:



Спасибо
Richard Szalay для указания оператора Drain я нашел другой пример использования оператора слива Джеймсом Майлзом . Вот как мне удалось заставить его работать в приложении WPF:



    .Drain(x => {
Process(x);
return Observable.Return(new Unit())
.Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
}).Subscribe();


Я немного повеселился, потому что пропуск параметра scheduler приводит к сбою приложения в режиме отладки без каких-либо исключений ( мне нужно научиться справляться с исключениями в Rx).
Метод Process напрямую изменяет состояние пользовательского интерфейса, но я думаю, что сделать IObservable из него довольно просто (используя ISubject?).



Обновление:



В тем временем я экспериментировал с ISubject, класс ниже делает то, что я хотел - он своевременно выпускает буферизованные Ts:



public class StepSubject<T> : ISubject<T>
{
IObserver<T> subscriber;
Queue<T> queue = new Queue<T>();
MutableDisposable cancel = new MutableDisposable();
TimeSpan interval;
IScheduler scheduler;
bool idle = true;

public StepSubject(TimeSpan interval, IScheduler scheduler)
{
this.interval = interval;
this.scheduler = scheduler;
}

void Step()
{
T next;
lock (queue)
{
idle = queue.Count == 0;
if (!idle)
next = queue.Dequeue();
}

if (!idle)
{
cancel.Disposable = scheduler.Schedule(Step, interval);
subscriber.OnNext(next);
}
}

public void OnNext(T value)
{
lock (queue)
queue.Enqueue(value);

if (idle)
cancel.Disposable = scheduler.Schedule(Step);
}

public IDisposable Subscribe(IObserver<T> observer)
{
subscriber = observer;
return cancel;
}
}


Эта наивная реализация лишена OnCompleted и OnError для ясности, также разрешена только одна подписка.

827   3  

3 ответов:

На самом деле все гораздо сложнее, чем кажется.

Использование Delay не работает, потому что значения по-прежнему будут происходить массово, только с небольшой задержкой.

Использование Interval либо с CombineLatest, либо с Zip не работает, так как первый вызовет пропуск исходных значений, а второй-буферизацию интервальных значений.

Я думаю, что новый оператор Drain (, добавленный в 1.0.2787.0 ), в сочетании с Delay должен сделать трюк:

source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x));

Оператор Drain работает как SelectMany, но ожидает завершения предыдущего вывода, прежде чем вызвать селектор со следующим значением. это все еще не точно то, что вы ищете (первое значение в блоке также будет отложено), но это близко: использование выше соответствует вашей мраморной диаграмме сейчас.

Edit: очевидно, что Drain в рамках не работает как SelectMany. Я попрошу совета на официальных форумах. В то же время, вот реализация дренажа, который делает то, что вы после:

Edit 09/11: исправлены ошибки в реализации и обновлено использование, чтобы соответствовать запрошенной Вами мраморной диаграмме.

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}

Просто для полноты здесь приводится альтернативная (более компактная) версия метода Drain (), предложенная Ричардом:

public static IObservable<T2> SelectManySequential<T1, T2>(
    this IObservable<T1> source, 
    Func<T1, IObservable<T2>> selector
)
{
    return source
        .Select(x => Observable.Defer<T2>(() => selector(x)))
        .Concat();
}

Смотрите нить Drain + SelectMany = ? в форуме Rx.

Обновление: Я понял, что перегрузка Concat (), которую я использовал, была одним из моих личных расширений Rx, которые (еще не) являются частью фреймворка. Я сожалею об этой ошибке .. Конечно, это делает мое решение менее элегантным, чем я думал.

Тем не менее для полноты I пост вот мой контакт() метод расширения перегрузки:

public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)
{
    return Observable.CreateWithDisposable<T>(o =>
    {
        var lockCookie = new Object();
        bool completed = false;
        bool subscribed = false;
        var waiting = new Queue<IObservable<T>>();
        var pendingSubscription = new MutableDisposable();

        Action<Exception> errorHandler = e =>
        {
            o.OnError(e);
            pendingSubscription.Dispose();
        };

        Func<IObservable<T>, IDisposable> subscribe = null;
        subscribe = (ob) =>
        {
            subscribed = true;
            return ob.Subscribe(
                o.OnNext,
                errorHandler,
                () =>
                {
                    lock (lockCookie)
                    {
                        if (waiting.Count > 0)
                            pendingSubscription.Disposable = subscribe(waiting.Dequeue());
                        else if (completed)
                            o.OnCompleted();
                        else
                            subscribed = false;
                    }
                }
            );
        };

        return new CompositeDisposable(pendingSubscription,
            source.Subscribe(
                n =>
                {
                    lock (lockCookie)
                    {
                        if (!subscribed)
                            pendingSubscription.Disposable = subscribe(n);
                        else
                            waiting.Enqueue(n);
                    }

                },
                errorHandler
                , () =>
                {
                    lock (lockCookie)
                    {
                        completed = true;
                        if (!subscribed)
                            o.OnCompleted();
                    }
                }
            )
        );
    });
}

А теперь избиваю себя собственным оружием.: Тот же метод Concat() можно было бы написать гораздо более элегантно в блестящем стиле Ричарда Салея:

public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)
{
    return Observable.Defer(() =>
    {
        BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
        return source
            .Zip(queue, (v, q) => v)
            .SelectMany(v => 
                v.Do(_ => { }, () => queue.OnNext(new Unit()))
            );
    });
}
Таким образом, заслуга принадлежит Ричарду. :- )

Вот как я это сделал, просто используя явную очередь (ReactiveCollection - это просто причудливая версия ObservableCollection WPF-ReactiveCollection.ItemsAdded OnNext для каждого добавленного элемента, как вы можете себе представить):

Https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ReactiveCollection.cs#L309

public static ReactiveCollection<T> CreateCollection<T>(this IObservable<T> FromObservable, TimeSpan? WithDelay = null)
{
    var ret = new ReactiveCollection<T>();
    if (WithDelay == null) {
        FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add);
        return ret;
    }

    // On a timer, dequeue items from queue if they are available
    var queue = new Queue<T>();
    var disconnect = Observable.Timer(WithDelay.Value, WithDelay.Value)
        .ObserveOn(RxApp.DeferredScheduler).Subscribe(_ => {
            if (queue.Count > 0) { 
                ret.Add(queue.Dequeue());
            }
        });

    // When new items come in from the observable, stuff them in the queue.
    // Using the DeferredScheduler guarantees we'll always access the queue
    // from the same thread.
    FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(queue.Enqueue);

    // This is a bit clever - keep a running count of the items actually 
    // added and compare them to the final count of items provided by the
    // Observable. Combine the two values, and when they're equal, 
    // disconnect the timer
    ret.ItemsAdded.Scan0(0, ((acc, _) => acc+1)).Zip(FromObservable.Aggregate(0, (acc,_) => acc+1), 
        (l,r) => (l == r)).Where(x => x != false).Subscribe(_ => disconnect.Dispose());

    return ret;
}

Comments

    Ничего не найдено.