Поток извлечен из Postgres с jOOQ, не возвращающим результаты из класса - PullRequest
1 голос
/ 28 января 2020

Issue

Я пытаюсь stream получить результаты запроса postgres во внешнем приложении, вместо того, чтобы охотно получать все результаты. Проблема в том, что я могу видеть только потоковые результаты только в моем терминале (то есть сначала в "org.jooq.tools.LoggerListener : Record fetched: ...", а затем с stream.get().forEach(s -> debug)), а класс, который ссылается на этот поток, выдает значения null только при вызове для просмотра ResultSet во внешнем интерфейсе.

Эти данные могут использоваться и для других задач (например, визуализация, загрузка / экспорт, сводная статистика и т. Д. c.). Я просматривал документацию и сообщения о jOOQ, которые я использую в качестве своего ORM, и я пытаюсь использовать следующие методы:

С нетерпением получать со следующим прекрасно работает сейчас, но это вернет все в одном гигантском ResponseEntity и не будет передавать результаты:


Текущие классы

DataController. java

@RestController
@RequestMapping(value = "/v3")
@Validated
public class DataController {

  @Autowired private QueryService queryService;

  @PostMapping(value = "/data", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
  @ApiOperation(value = "Query the data")
  @ResponseStatus(HttpStatus.CREATED)
  public ResponseEntity<QueryResult> getQueryResults(
      @RequestBody @ValidQuery Query query, HttpServletRequest request) {

    QueryResult res = queryService.search(query);
    return ResponseEntity.ok(res);
  }
// ...
}

QueryResult. java

public QueryResult(Stream<Record> result) {
    this.result = result;
  }

//  public List<Map<String, Object>> getResult() { return result; }
  @JsonProperty("result")
  public Stream<Record> getResult() { return result; }


//  public void setResult(List<Map<String, Object>> result) { this.result = result; }
  public void setResult(Stream<Record> result) { this.result = result; }

}

QueryService. java

@Service
public class QueryService implements SearchService{
  @Autowired DefaultDSLContext dslContext;

  public QueryResult search(Query query) {

    LinkedHashMap<DataSourceName, List<String>> selections = query.getSelections();

    // Build selected fields
    List<SelectField> selectFields = QueryUtils.getSelectionFields(selections);

    // Current support is for a single query. All others passed will be ignored
    List<Filter> filters = query.getFilters();
    Filter leadingFilter = QueryUtils.getLeadingFilter(filters);

    // Build "where" conditions
    Condition conditionClause = QueryUtils.getConditionClause(leadingFilter);

    // Get "from" statement
    Table<Record> fromClause = QueryUtils.getFromStatement(fromDataSource,query.getJoins());

    /*
    // Works fine, but is not lazy fetching
    List<Map<String, Object>> results =
        dslContext
            .select(selectFields)
            .from(fromClause)
            .where(conditionClause)
            .limit(query.getOffset(), query.getLimit())
            .fetchMaps();
    */

      // Appears to work only once. 
      // Cannot see any results returned, but the number of records is correct. 
      // Everything in the records is null / undefined in the frontend
      Supplier<Stream<Record>> results = () ->
              dslContext
                      .select(selectFields)
                      .from(fromClause)
                      .where(conditionClause)
                      .limit(query.getOffset(), query.getLimit())
                      .fetchStream();

      // "stream has already been operated upon or closed" is returned when using a Supplier
      results.get().forEach(s -> logger.debug("Streamed record: \n" + String.valueOf(s)));

      return new QueryResult(results.get());

  }
}

Запрос. java

public class Query {
  @NotNull(message = "Query must contain selection(s)")
  private LinkedHashMap<DataSourceName, List<String>> selections;
  private List<Filter> filters;
  private List<Join> joins;
  private List<Sort> sorts;
  private long offset;
  private int limit;

  private QueryOptions options;

  @JsonProperty("selections")
  public LinkedHashMap<DataSourceName, List<String>> getSelections() {
    return selections;
  }

  public void setSelections(LinkedHashMap<DataSourceName, List<String>> selections) {
    this.selections = selections;
  }

  @JsonProperty("filters")
  public List<Filter> getFilters() {
    return filters;
  }

  public void setFilters(List<Filter> filters) {
    this.filters = filters;
  }

  @JsonProperty("joins")
  public List<Join> getJoins() {
    return joins;
  }

  public void setJoins(List<Join> joins) {
    this.joins = joins;
  }

  @JsonProperty("sorts")
  public List<Sort> getSorts() {
    return sorts;
  }

  public void setSorts(List<Sort> sorts) {
    this.sorts = sorts;
  }

  @JsonProperty("options")
  public QueryOptions getOptions() {
    return options;
  }

  public void setOptions(QueryOptions options) {
    this.options = options;
  }

  @JsonProperty("offset")
  public long getOffset() {
    return offset;
  }

  public void setOffset(long offset) {
    this.offset = offset;
  }

  @JsonProperty("limit")
  public int getLimit() {
    return limit;
  }

  public void setLimit(int limit) {
    this.limit = limit;
  }

  @Override
  public String toString() {
    return "Query{"
        + "selections=" + selections
        + ", filters="  + filters
        + ", sorts="    + sorts
        + ", offSet="   + offset
        + ", limit="    + limit
        + ", options="  + options
        + '}';
  }
}

DataApi . js

// ...
const dataApi = axios.create({baseURL: `${my_data_url}`,});
// ...
export default dataApi;

Data.jsx

// ...

// This block queries Spring, and it returns the ResponseEntity with the ResultSet
// Streaming returns the right number of records, but every record is null / undefined
try {
      const response = await dataApi.post('/v3/data', query);
} catch (error) {
// ...
}
// ...

Возвращенный результат в консоли

{data: {…}, status: 200, statusText: "OK", headers: {…}, config: {…}, …}
data:
result: Array(100)
0: {}
1: {}
2: {}
3: {}
...

Стек:

  • Docker: 19.03.5
  • Spring Boot : v2.1.8.RELEASE
  • Узел : v12.13.1
  • Реагировать : 16,9. 0
  • OpenJDK : 12.0.2
  • jOOQ : 3.12.3
  • postgres : 10,7

1 Ответ

1 голос
/ 29 января 2020

Весь смысл API Java Stream в том, что такой поток будет использоваться не более одного раза. Он не имеет никакой функции буферизации и не поддерживает потоковую модель на основе pu sh, как это делают реализации реактивного потока.

Вы можете добавить другой API в свой стек, например, Reactor (есть и другие, но поскольку вы уже используете Spring ...), который поддерживает буферизацию и воспроизведение потоков для нескольких потребителей, но не имеет никакого отношения к jOOQ напрямую и сильно повлияет на архитектуру вашего приложения.

Обратите внимание, что jOOQ ResultQuery расширяет org.reactivestreams.Publisher и JDK 9 Flow.Publisher для лучшей совместимости с такими реактивными потоками.

...