как ваше первоначальное значение вместо.

ользуюWebClient и обычайBodyExtractorкласс для моего приложения весенней загрузки

WebClient webLCient = WebClient.create();
webClient.get()
   .uri(url, params)
   .accept(MediaType.APPLICATION.XML)
   .exchange()
   .flatMap(response -> {
     return response.body(new BodyExtractor());
   })

BodyExtractor.java

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  body.map(dataBuffer -> {
    try {
      JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
      Unmarshaller unmarshaller = jc.createUnmarshaller();

      return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
    } catch(Exception e){
       return null;
    }
  }).next();
}

Вышеупомянутый код работает с небольшой полезной нагрузкой, но не с большой полезной нагрузкой, я думаю, это потому, что я читаю только одно значение потока сnext и я не уверен, как объединить и прочитать всеdataBuffer.

Я новичок в реакторе, поэтому я не знаю много трюков с флюсом / моно.

Ответы на вопрос(3)

InputStream побеждает цель использованияWebClient во-первых, потому что ничего не будет излучаться доcollect операция завершена. Для большого потока это может быть очень долго. Реактивная модель не имеет дело с отдельными байтами, но блоки байтов (как SpringDataBuffer). Смотрите мой ответ здесь для более элегантного решения:https://stackoverflow.com/a/48054615/839733

 Nicolas Raoul12 февр. 2019 г., 03:20
Вы ссылаетесь на другой ответ, который, в свою очередь, ссылается на другой ответ с текущим счетом -2. Не могли бы вы удалить ссылку и объяснить все в этом ответе? Спасибо!
 Abhijit Sarkar12 февр. 2019 г., 03:39
@NicolasRaoul Не стесняйтесь вносить изменения, если считаете, что это улучшает ответ. Меня не интересует конкурс популярности, поэтому я не забочусь о даунтерсах.
Решение Вопроса

Я смог заставить его работать с помощьюFlux#collect а такжеSequenceInputStream

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  return body.collect(InputStreamCollector::new, (t, dataBuffer)-> t.collectInputStream(dataBuffer.asInputStream))
    .map(inputStream -> {
      try {
        JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
        Unmarshaller unmarshaller = jc.createUnmarshaller();

        return (T) unmarshaller.unmarshal(inputStream);
      } catch(Exception e){
        return null;
      }
  }).next();
}

InputStreamCollector.java

public class InputStreamCollector {
  private InputStream is;

  public void collectInputStream(InputStream is) {
    if (this.is == null) this.is = is;
    this.is = new SequenceInputStream(this.is, is);
  }

  public InputStream getInputStream() {
    return this.is;
  }
}
 Brian Clozel28 сент. 2017 г., 11:28
почему вы пишете свой собственный BodyExtractor? WebFlux уже поддерживает Jaxb с Jaxb2XmlDecoder.
 Bk Santiago29 сент. 2017 г., 03:30
@BrianClozel мне нужно что-то настроить для его работы?bodyToMono похоже, не подхватывает моё pojo.
 Bk Santiago14 дек. 2017 г., 03:43
@AbhijitSarkar, пожалуйста, проверьте мое использование выше.
 Abhijit Sarkar07 дек. 2017 г., 01:09
какойInputStreamCollector?
 Abhijit Sarkar14 дек. 2017 г., 04:50
Интересно, ноWebClient это неправильный инструмент для этой работы. Вы восстанавливаете ответInputStreamТаким образом, вы не получаете никаких преимуществ от использованияWebClient, Вам лучше использовать простой ванильный HTTP-клиент.

reduce() вместоcollect(), Очень похоже, но не требует дополнительного класса:

Ява:

body.reduce(new InputStream() {
    public int read() { return -1; }
  }, (s: InputStream, d: DataBuffer) -> new SequenceInputStream(s, d.asInputStream())
).flatMap(inputStream -> /* do something with single InputStream */

Или Котлин

body.reduce(object : InputStream() {
  override fun read() = -1
}) { s: InputStream, d -> SequenceInputStream(s, d.asInputStream()) }
  .flatMap { inputStream -> /* do something with single InputStream */ }

Преимущество такого подхода перед использованиемcollect() просто вам не нужно иметь другой класс, чтобы собрать вещи.

Я создал новый пустойInputStream(), но если этот синтаксис сбивает с толку, вы также можете заменить его наByteArrayInputStream("".toByteArray()) вместо того, чтобы создать пустойByteArrayInputStream как ваше первоначальное значение вместо.

Ваш ответ на вопрос