基于RxJava2.0+Retrofit2.0超大文件分块(分片)上传(带进度)
当上传文件过大时,直接上传会增加服务器负载,负载过高会导致机器无法处理其他请求及操作,甚至导致宕机,这时分块上传是最佳选择。本次简单介绍两种分块上传文件方案(暂不考虑文件安全相关问题,如签名):
方案一: 客户端把单个文件切割成若干块,依次上传,最后由服务端合并
方案二: 客户端读取文件 offset 到 offset+chunk的文件块上传,每次上传完返回新的offset(或者每次上传后,本地保存最新的offset),客户端更新offset值并继续下一次上传,本文为了测试方便,最新的偏移量本地计算好即不通过接口返回最新偏移量
文件分块上传与断点下载相比,断点下载的进度保存在客户端(使用数据库保存),分块上传的进度保存在服务端并由服务端合并
效果图:
两种方案对比,方案二上传时间更短,因为方案一基于文件操作,方案二基于内存操作,所以方案二上传时间更短
方案一
客服端:
public void uploadTest1(View view) {
countUploadSize = 0;
clearDir();
totalSize = file.length();
//分割文件并组装数据上传,文件分割数量根据实际情况调整
List<File> cut = FileUtils.split(file.getAbsolutePath(), 10);
int totalChunk = cut.size();
FileChunkReq[] items = new FileChunkReq[cut.size()];
for (int i = 0; i < cut.size(); i++) {
FileChunkReq fileChunkReq = new FileChunkReq();
fileChunkReq.file = cut.get(i);
fileChunkReq.fileName = file.getName();
fileChunkReq.fileSize = totalSize;
fileChunkReq.chunkNum = i + 1;
fileChunkReq.totalChunk = totalChunk;
fileChunkReq.uuid = UUID;
items[i] = fileChunkReq;
}
startTime = System.currentTimeMillis();
Observable.fromArray(items)
.concatMap(new Function<FileChunkReq, ObservableSource<ResponseResult>>() {
@Override
public ObservableSource<ResponseResult> apply(FileChunkReq fcq) throws Exception {
return upload1(fcq);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<ResponseResult>() {
@Override
public void accept(ResponseResult responseResult) throws Exception {
showMsg("分块上传方案1responseResult:" + responseResult.toString());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
showMsg("分块上传方案1异常:" + throwable.getMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
showMsg("分块上传方案1完毕");
}
});
}
接口调用
/**
* 分块上传1
*
* @param fileChunkReq
* @return
*/
private Observable<ResponseResult> upload1(FileChunkReq fileChunkReq) {
//"application/octet-stream"
//"multipart/form-data"
FileProgressRequestBody filePart = new FileProgressRequestBody(fileChunkReq.file, "application/octet-stream", new FileProgressRequestBody.ProgressListener() {
@Override
public void progress(long upload, long totalUpload) {
synchronized (obj) {
showProgress(upload);
Log.w(TAG, "上传方案1,countUploadSize:" + countUploadSize + ",percent:" + percent);
}
}
});
final MultipartBody requestBody = new MultipartBody.Builder()
.setType(MultipartBody.FORM)
.addFormDataPart("file", fileChunkReq.file.getName(), filePart)
.addFormDataPart("fileName", fileChunkReq.fileName)
.addFormDataPart("fileSize", String.valueOf(fileChunkReq.fileSize))
.addFormDataPart("uuid", fileChunkReq.uuid) //可用于持久化,作为文件存放目录,验证重复上传
.addFormDataPart("chunkNum", String.valueOf(fileChunkReq.chunkNum))
.addFormDataPart("totalChunk", String.valueOf(fileChunkReq.totalChunk))
.build();
Observable<ResponseResult> observable = uploadApi.upload1(requestBody);
return observable;
}
方案二
客服端:
public void uploadTest2(View view) {
countUploadSize = 0;
offset = 0;
long length = file.length();
totalSize = length;
startTime = System.currentTimeMillis();
Observable.create(new ObservableOnSubscribe<FileChunkReq>() {
@Override
public void subscribe(ObservableEmitter<FileChunkReq> emitter) throws Exception {
int blockSize = 1024 * 1024; //1M
while (offset < totalSize) {
//基于偏移量获取块大小,blockSize 可根据服务端调整
byte[] block = FileUtils.getBlock(offset, file, blockSize);
if (block != null) {
FileChunkReq fileChunkReq = new FileChunkReq();
fileChunkReq.fileByte = block;
fileChunkReq.fileName = file.getName();
fileChunkReq.fileSize = totalSize;
fileChunkReq.offset = offset;
fileChunkReq.uuid = UUID;
emitter.onNext(fileChunkReq);
//下一个偏移量
offset += block.length;
}
}
emitter.onComplete();
}
}).flatMap(new Function<FileChunkReq, ObservableSource<ResponseResult>>() {
@Override
public ObservableSource<ResponseResult> apply(FileChunkReq fileChunkReq) throws Exception {
return upload2(fileChunkReq);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<ResponseResult>() {
@Override
public void accept(ResponseResult responseResult) throws Exception {
showMsg("分块上传方案2responseResult:" + responseResult.toString());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
showMsg("分块上传方案2异常:" + throwable.getMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
showMsg("分块上传方案2完毕");
}
});
}
接口调用
/**
* 分块上传2
*
* @param fileChunkReq
* @return
*/
private ObservableSource<ResponseResult> upload2(FileChunkReq fileChunkReq) {
ByteProgressRequestBody filePart = new ByteProgressRequestBody(fileChunkReq.fileByte, "application/octet-stream", new FileProgressRequestBody.ProgressListener() {
@Override
public void progress(long upload, long totalUpload) {
synchronized (obj) {
showProgress(upload);
Log.w(TAG, "上传方案2,countUploadSize:" + countUploadSize + ",percent:" + percent);
}
}
});
final MultipartBody requestBody = new MultipartBody.Builder()
.setType(MultipartBody.FORM)
.addFormDataPart("file", fileChunkReq.fileName, filePart)
.addFormDataPart("offset", String.valueOf(fileChunkReq.offset))
.addFormDataPart("fileName", fileChunkReq.fileName)
.addFormDataPart("fileSize", String.valueOf(fileChunkReq.fileSize))
.addFormDataPart("uuid", fileChunkReq.uuid)
.build();
Observable<ResponseResult> observable = uploadApi.upload2(requestBody);
return observable;
}
小结
上面两种方案不支持多线程分块上传,必须严格按照分块顺序依次上传,否则影响最终上传文件的完整性,本文只提供思路,大家可基于示例扩展完善,置于选择哪种方案,看具体情况,各有优缺点。
demo地址
GitHub:https://github.com/kellysong/android-blog-demo/tree/master/net-demo