путь?

я есть очередь блокировки, которая продолжает получать сообщения через какое-то приложение, теперь в приложении asp.net я пытался использовать очередь и записать вывод в файл CSV / JSON.

Здесь я хочу хранить сообщения размером до 1 МБ, которые получают из очереди блокировки, а затем записывают их, теперь снова сохраняют данные на 1 МБ и снова записывают ... и так далее.

В приведенном ниже коде я используюsystem.reactive буфера и может содержать число наблюдаемых и записывать в JSON, но есть ли какой-либо способ по размеру наблюдаемых?

class Program
{
    private static readonly BlockingCollection<Message> MessagesBlockingCollection = new BlockingCollection<Message>();

    private static void Producer()
    {
        int ctr = 1;
        while (ctr <= 11)
        {
            MessagesBlockingCollection.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
            Thread.Sleep(1000);
            ctr++;
        }
    }

    private static void Consumer()
    {
        var observable = MessagesBlockingCollection.GetConsumingEnumerable().ToObservable();

        var bufferedNumberStream = observable.BufferWithThrottle(TimeSpan.FromSeconds(60), 5)
                                    .Subscribe(ts =>
                                    {
                                        WriteToFile(ts.ToList());
                                    });
    }

    private static void WriteToFile(List<Message> listToWrite)
    {
        using (StreamWriter outFile = System.IO.File.CreateText(Path.Combine(@"C:\TEMP", $"{DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json")))
        {
            outFile.Write(JsonConvert.SerializeObject(listToWrite));
        }
    }

    static void Main(string[] args)
    {
        var producer = Task.Factory.StartNew(() => Producer());
        var consumer = Task.Factory.StartNew(() => Consumer());
        Console.Read();
    }
}

Реактивный метод продления,

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source,
                                                                            TimeSpan threshold, int noOfStream)
    {
        return Observable.Create<IList<TSource>>((obs) =>
        {
            return source.GroupByUntil(_ => true,
                                       g => g.Throttle(threshold).Select(_ => Unit.Default)
                                             .Merge(g.Buffer(noOfStream).Select(_ => Unit.Default)))
                         .SelectMany(i => i.ToList())
                         .Subscribe(obs);
        });
    }

Класс сообщения,

public class Message
{
    public int Id { get; set; }
    public string Name { get; set; }
}

Ответы на вопрос(0)

Ваш ответ на вопрос