暂时只能在onComplete中设置: CompletableFuture.complete来通知调用方结束了. 示例: Flowable由持久层方法返回,是调用方中的:`result.getAll(dbName.get(), strategy)` 和 `result.getTableColumn(table)` public class ConsoleSchemaFlowableOutput implements SchemaFlowableOutput { private final static Logger logger = LoggerFactory.getLogger(ConsoleSchemaFlowableOutput.class); private volatile CompletableFuture future = new CompletableFuture(); private AtomicInteger count = new AtomicInteger(0); @Override public Disposable flush(Information information, Flowable table) throws SchemaExportException { logger.info("Start Flowable Flush"); Disposable export_flush_complete = table.subscribe(tableIns -> { System.out.println(printAsciiTable(tableIns)); System.out.println(printAsciiColumns(tableIns.getColumns())); System.out.println("\r\n"); count.addAndGet(1); }, throwable -> { logger.debug("Export Break, reason: " + throwable.getMessage()); future.cancel(true); throw new SchemaExportException(throwable); }, new Action() { @Override public void run() throws Exception { logger.debug("Export Complete, Affect Size:"+count.get()); future.complete("OK"); } }); return export_flush_complete; } @Override public CompletableFuture getFuture() { return future; } ... } 调用方: public void export(Information info, SchemaFlowableOutput out) throws SchemaExportException{ long startStamp = System.currentTimeMillis(); // Flowable Flowable tableFlowable = result.getAll(dbName.get(), strategy).flatMap(new Function>() { @Override public Publisher apply(@NonNull Table table) throws Exception { return result.getTableColumn(table).flatMap(new Function, SingleSource>() { @Override public SingleSource apply(@NonNull List columns) throws Exception { return Single.just(table.fillColumn(columns)); } }).flatMapPublisher(new Function>() { @Override public Publisher apply(@NonNull Table table) throws Exception { return Flowable.just(table); } }); } }); Disposable disposable = null; try { disposable = out.flush(info, tableFlowable); CompletableFuture future= out.getFuture(); while(!future.isDone()){ logger.info("[ERE-Flowable]未完成,线程休眠1秒"); Thread.currentThread().sleep(1000,0); } String result = future.get(); logger.info("[ERE-Flowable]完成, 结果:"+result); if(result.equals("OK")){ long finishStamp = System.currentTimeMillis(); clearHander(disposable, "[ERE-Flowable]RxJava disposed because complete, WithTime: "+(finishStamp-startStamp)); } }catch (Exception e){ clearHander(disposable, "[ERE-Flowable]RxJava disposed has Exception: "+e.getMessage()); } } private void clearHander(Disposable disposable, String reason){ logger.info(reason); if(null!=disposable && !disposable.isDisposed()) { disposable.dispose(); }else{ if(null != disposable) { logger.info("[CH]disposable status:" + disposable.isDisposed()); }else{ logger.info("[CH]disposable is null:"); } } // 结束后的回调,执行一些清理工作 completeHandler.apply(); }