some initial work on cloud sync

This commit is contained in:
Luke Pulverenti
2015-02-05 00:29:37 -05:00
parent 1f1852f3cb
commit 7d415fc2fd
19 changed files with 333 additions and 568 deletions

View File

@@ -2,12 +2,15 @@
using MediaBrowser.Controller.Sync;
using MediaBrowser.Model.Dlna;
using MediaBrowser.Model.Sync;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace MediaBrowser.Server.Implementations.Sync
{
public class CloudSyncProvider : ISyncProvider
public class CloudSyncProvider : IServerSyncProvider
{
private ICloudSyncProvider[] _providers = {};
@@ -35,5 +38,25 @@ namespace MediaBrowser.Server.Implementations.Sync
{
get { return "Cloud Sync"; }
}
public Task<List<string>> GetServerItemIds(string serverId, SyncTarget target, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
public Task DeleteItem(string serverId, string itemId, SyncTarget target, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
public Task TransferItemFile(string serverId, string itemId, string path, SyncTarget target, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
public Task TransferRelatedFile(string serverId, string itemId, string path, ItemFileType type, SyncTarget target, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
}
}

View File

@@ -0,0 +1,174 @@
using MediaBrowser.Common.Progress;
using MediaBrowser.Controller;
using MediaBrowser.Controller.Sync;
using MediaBrowser.Model.Logging;
using MediaBrowser.Model.Sync;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace MediaBrowser.Server.Implementations.Sync
{
public class MediaSync
{
private readonly ISyncManager _syncManager;
private readonly IServerApplicationHost _appHost;
private readonly ILogger _logger;
public MediaSync(ILogger logger, ISyncManager syncManager, IServerApplicationHost appHost)
{
_logger = logger;
_syncManager = syncManager;
_appHost = appHost;
}
public async Task Sync(IServerSyncProvider provider,
SyncTarget target,
IProgress<double> progress,
CancellationToken cancellationToken)
{
var serverId = _appHost.SystemId;
await SyncData(provider, serverId, target, cancellationToken).ConfigureAwait(false);
progress.Report(2);
// Do the data sync twice so the server knows what was removed from the device
await SyncData(provider, serverId, target, cancellationToken).ConfigureAwait(false);
progress.Report(3);
var innerProgress = new ActionableProgress<double>();
innerProgress.RegisterAction(pct =>
{
var totalProgress = pct * .97;
totalProgress += 1;
progress.Report(totalProgress);
});
await GetNewMedia(provider, target, serverId, innerProgress, cancellationToken);
progress.Report(100);
}
private async Task SyncData(IServerSyncProvider provider,
string serverId,
SyncTarget target,
CancellationToken cancellationToken)
{
var localIds = await provider.GetServerItemIds(serverId, target, cancellationToken).ConfigureAwait(false);
var result = await _syncManager.SyncData(new SyncDataRequest
{
TargetId = target.Id,
LocalItemIds = localIds
}).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
foreach (var itemIdToRemove in result.ItemIdsToRemove)
{
try
{
await RemoveItem(provider, serverId, itemIdToRemove, target, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.ErrorException("Error deleting item from sync target. Id: {0}", ex, itemIdToRemove);
}
}
}
private async Task GetNewMedia(IServerSyncProvider provider,
SyncTarget target,
string serverId,
IProgress<double> progress,
CancellationToken cancellationToken)
{
var jobItems = _syncManager.GetReadySyncItems(target.Id);
var numComplete = 0;
double startingPercent = 0;
double percentPerItem = 1;
if (jobItems.Count > 0)
{
percentPerItem /= jobItems.Count;
}
foreach (var jobItem in jobItems)
{
cancellationToken.ThrowIfCancellationRequested();
var currentPercent = startingPercent;
var innerProgress = new ActionableProgress<double>();
innerProgress.RegisterAction(pct =>
{
var totalProgress = pct * percentPerItem;
totalProgress += currentPercent;
progress.Report(totalProgress);
});
await GetItem(provider, target, serverId, jobItem, innerProgress, cancellationToken).ConfigureAwait(false);
numComplete++;
startingPercent = numComplete;
startingPercent /= jobItems.Count;
startingPercent *= 100;
progress.Report(startingPercent);
}
}
private async Task GetItem(IServerSyncProvider provider,
SyncTarget target,
string serverId,
SyncedItem jobItem,
IProgress<double> progress,
CancellationToken cancellationToken)
{
var libraryItem = jobItem.Item;
var internalSyncJobItem = _syncManager.GetJobItem(jobItem.SyncJobItemId);
var fileTransferProgress = new ActionableProgress<double>();
fileTransferProgress.RegisterAction(pct => progress.Report(pct * .92));
await _syncManager.ReportSyncJobItemTransferBeginning(internalSyncJobItem.Id);
var transferSuccess = false;
Exception transferException = null;
try
{
await provider.TransferItemFile(serverId, libraryItem.Id, internalSyncJobItem.OutputPath, target, cancellationToken)
.ConfigureAwait(false);
progress.Report(92);
transferSuccess = true;
progress.Report(99);
}
catch (Exception ex)
{
_logger.ErrorException("Error transferring sync job file", ex);
transferException = ex;
}
if (transferSuccess)
{
await _syncManager.ReportSyncJobItemTransferred(jobItem.SyncJobItemId).ConfigureAwait(false);
}
else
{
await _syncManager.ReportSyncJobItemTransferFailed(jobItem.SyncJobItemId).ConfigureAwait(false);
throw transferException;
}
}
private Task RemoveItem(IServerSyncProvider provider,
string serverId,
string itemId,
SyncTarget target,
CancellationToken cancellationToken)
{
return provider.DeleteItem(serverId, itemId, target, cancellationToken);
}
}
}

View File

@@ -870,6 +870,32 @@ namespace MediaBrowser.Server.Implementations.Sync
await processor.UpdateJobStatus(jobItem.JobId).ConfigureAwait(false);
}
public async Task ReportSyncJobItemTransferBeginning(string id)
{
var jobItem = _repo.GetJobItem(id);
jobItem.Status = SyncJobItemStatus.Transferring;
await UpdateSyncJobItemInternal(jobItem).ConfigureAwait(false);
var processor = GetSyncJobProcessor();
await processor.UpdateJobStatus(jobItem.JobId).ConfigureAwait(false);
}
public async Task ReportSyncJobItemTransferFailed(string id)
{
var jobItem = _repo.GetJobItem(id);
jobItem.Status = SyncJobItemStatus.ReadyToTransfer;
await UpdateSyncJobItemInternal(jobItem).ConfigureAwait(false);
var processor = GetSyncJobProcessor();
await processor.UpdateJobStatus(jobItem.JobId).ConfigureAwait(false);
}
public QueryResult<string> GetLibraryItemIds(SyncJobItemQuery query)
{
return _repo.GetLibraryItemIds(query);