Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
2de6b2e
first steps
Jul 29, 2020
d297753
general structure of SqlTableWatcher fleshed out
Jul 30, 2020
4b43efc
more stuff
Jul 30, 2020
29fb0ce
have the basic flow working
Jul 31, 2020
5058db2
can't get the converter working
Aug 4, 2020
6858371
got the generic conversion working
Aug 4, 2020
fd63294
everything seems to be working now
Aug 5, 2020
473e584
comments and some cleanup
Aug 5, 2020
e4a9629
first steps
Jul 29, 2020
b668144
general structure of SqlTableWatcher fleshed out
Jul 30, 2020
7df5cba
more stuff
Jul 30, 2020
475c37f
have the basic flow working
Jul 31, 2020
3433ec5
can't get the converter working
Aug 4, 2020
43eb627
got the generic conversion working
Aug 4, 2020
5421c0a
everything seems to be working now
Aug 5, 2020
3baf14d
comments and some cleanup
Aug 5, 2020
7c5ee5f
Merge branch 'triggerbinding' of https://github.com/Azure/azure-funct…
Aug 5, 2020
1ca2570
some cleanup
Aug 6, 2020
b705297
addressed some pr comments
Aug 6, 2020
35ff5ae
sanitized sql queries
Aug 7, 2020
cec95d1
added schema and some thread safety stuff
Aug 8, 2020
1f691d3
slight error in last commit
Aug 8, 2020
6d86a7e
addressing some pr comments
Aug 10, 2020
e2e77e1
updated how we poll for changes/renew leases
Aug 11, 2020
5927884
added getting the user table data to the query where we check for cha…
Aug 11, 2020
c6fa2ef
added batching of commands
Aug 12, 2020
78538a1
minor cleanup
Aug 12, 2020
a7b8e45
added exception handling
Aug 12, 2020
d3dc646
added a function for testing, some more clean up
Aug 13, 2020
9c3ce3c
first steps
Jul 29, 2020
a78fe04
more stuff
Jul 30, 2020
14f6dce
have the basic flow working
Jul 31, 2020
dbeae95
can't get the converter working
Aug 4, 2020
13b39be
got the generic conversion working
Aug 4, 2020
e48ff8b
everything seems to be working now
Aug 5, 2020
410d446
comments and some cleanup
Aug 5, 2020
61aae37
some cleanup
Aug 6, 2020
5552966
addressed some pr comments
Aug 6, 2020
c879568
sanitized sql queries
Aug 7, 2020
7144e6a
added schema and some thread safety stuff
Aug 8, 2020
80c1f9f
addressing some pr comments
Aug 10, 2020
e0242fa
added getting the user table data to the query where we check for cha…
Aug 11, 2020
e05689e
added batching of commands
Aug 12, 2020
74b5a67
added exception handling
Aug 12, 2020
86053ee
added a function for testing, some more clean up
Aug 13, 2020
4c079fd
Merge branch 'triggerbinding' of https://github.com/Azure/azure-funct…
Aug 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
updated how we poll for changes/renew leases
  • Loading branch information
Sophia Tevosyan committed Aug 11, 2020
commit e2e77e11ffe1b9daba4a60b8994ec71519046b67
214 changes: 103 additions & 111 deletions src/SqlBinding/TriggerBinding/SqlTableWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,16 @@ internal class SqlTableWatcher
private readonly string _connectionStringSetting;
private readonly IConfiguration _configuration;
private readonly ITriggeredFunctionExecutor _executor;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly CancellationTokenSource _cancellationTokenSourceExecutor;
private readonly CancellationTokenSource _cancellationTokenSourceCheckForChanges;
private readonly CancellationTokenSource _cancellationTokenSourceRenewLeases;

// It should be impossible for multiple threads to access these at the same time because of the semaphores we use
private readonly Dictionary<string, string> _primaryKeys;
private readonly List<Dictionary<string, string>> _rows;
private readonly Dictionary<string, string> _queryStrings;
private readonly Timer _renewLeasesTimer;
private readonly Timer _checkForChangesTimer;
private readonly SemaphoreSlim _checkForChangesLock;
private readonly SemaphoreSlim _renewLeasesLock;
private readonly EventWaitHandle _signalStoppedHandle;

// Can't use an enum for these because it doesn't work with the Interlocked class
private int _state;
private const int CheckingForChanges = 0;
private const int ProcessingChanges = 1;
private const int Stopped = 2;
private readonly SemaphoreSlim _leasesLock;
private State _state;

/// <summary>
/// Initializes a new instance of the <see cref="SqlTableWatcher"/> class.
Expand Down Expand Up @@ -79,17 +72,14 @@ public SqlTableWatcher(string table, string connectionStringSetting, IConfigurat
_userTable = SqlBindingUtilities.NormalizeTableName(table);
_workerTable = SqlBindingUtilities.NormalizeTableName(BuildWorkerTableName(table));

_cancellationTokenSource = new CancellationTokenSource();
_cancellationTokenSourceExecutor = new CancellationTokenSource();
_cancellationTokenSourceCheckForChanges = new CancellationTokenSource();
_cancellationTokenSourceRenewLeases = new CancellationTokenSource();
_leasesLock = new SemaphoreSlim(1);

_rows = new List<Dictionary<string, string>>();
_queryStrings = new Dictionary<string, string>();
_primaryKeys = new Dictionary<string, string>();

_checkForChangesTimer = new Timer(CheckForChangesCallback);
_renewLeasesTimer = new Timer(RenewLeasesCallback);
_checkForChangesLock = new SemaphoreSlim(1);
_renewLeasesLock = new SemaphoreSlim(1);
_signalStoppedHandle = new AutoResetEvent(false);

}

/// <summary>
Expand All @@ -99,9 +89,11 @@ public SqlTableWatcher(string table, string connectionStringSetting, IConfigurat
public async Task StartAsync()
{
await CreateWorkerTableAsync();
_checkForChangesTimer.Change(0, SqlTriggerConstants.PollingInterval * 1000);
// Want to renew the leases faster than the lease time expires
_renewLeasesTimer.Change(0, SqlTriggerConstants.LeaseTime * 1000 / 2);
Task.Run(() =>
{
CheckForChangesAsync(_cancellationTokenSourceCheckForChanges.Token);
RenewLeases(_cancellationTokenSourceRenewLeases.Token);
});
}

/// <summary>
Expand All @@ -110,115 +102,106 @@ public async Task StartAsync()
/// <returns></returns>
public async Task StopAsync()
{
// There are three possibilities:
// 1. We haven't started polling for changes yet. In that case, the first time the CheckForChangesCallback executes, the
// "_state == State.CheckingForChanges" if check will fail, and the method will skip directly to the finally clause, where it
// registers the stopped state and disposes the timers
// 2. We have started polling for changes, but are not processing any. The next time the CheckForChangesCallback executes,
// the same steps will be follows as in 1
// 3. We are currently processing changes. Once the CheckForChangesCallback finishes processing changes, it will reach the
// finally clause, register the stopped state, and again dispose the timers
Interlocked.Exchange(ref _state, Stopped);
// Block until the timers have been released
_signalStoppedHandle.WaitOne();
_signalStoppedHandle.Dispose();
_cancellationTokenSourceCheckForChanges.Cancel();
}

/// <summary>
/// Executed once every "_leaseTime" period. If the state of the watcher is ProcessingChanges, then
/// Executed once every <see cref="SqlTriggerConstants.LeaseTime"/> period.
/// If the state of the watcher is <see cref="State.ProcessingChanges"/>, then
/// we will renew the leases held by the watcher on "_rows"
/// </summary>
/// <param name="state">Unused</param>
private async void RenewLeasesCallback(object state)
/// <param name="token">
/// If the token is cancelled, leases are no longer renewed
/// </param>
private async void RenewLeases(CancellationToken token)
{
await _renewLeasesLock.WaitAsync();
try
{
if (_state == ProcessingChanges)
while (!token.IsCancellationRequested)
{
// To prevent useless reinvocation of the callback while it's executing
_renewLeasesTimer.Change(Timeout.Infinite, Timeout.Infinite);
await RenewLeasesAsync();
await _leasesLock.WaitAsync();
try
{
if (_state == State.ProcessingChanges)
{
await RenewLeasesAsync();
}
}
catch (Exception e)
{
// logger here
}
finally
{
// Want to always release the lock at the end, even if renewing the leases failed
_leasesLock.Release();
}
// Want to make sure to renew the leases before they expire, so we renew them twice per
// lease period
await Task.Delay(SqlTriggerConstants.LeaseTime / 2 * 1000, token);
}
}
catch (Exception e)
{
// have a logger here
}
finally
{
// Re-enable timer
_renewLeasesTimer.Change(0, SqlTriggerConstants.LeaseTime * 1000 / 2);
_renewLeasesLock.Release();
// have a logger here. could also be triggered by a TaskCancelledException
}
}

/// <summary>
/// Executed once every "_pollingInterval" period. If the state of the watcher is CheckingForChanges, then
/// Executed once every <see cref="SqlTriggerConstants.PollingInterval"/> period. If the state of the watcher is <see cref="State.CheckingForChanges"/>, then
/// the method query the change/worker tables for changes on the user's table. If any are found, the state of the watcher is
/// transitioned to ProcessingChanges and the user's function is executed with the found changes.
/// If execution is successful, the leases on "_rows" are released and the state transitions to CheckingForChanges
/// transitioned to <see cref="State.ProcessingChanges"/> and the user's function is executed with the found changes.
/// If execution is successful, the leases on "_rows" are released and the state transitions to <see cref="State.CheckingForChanges"/>
/// once more
/// </summary>
/// <param name="state"></param>
private async void CheckForChangesCallback(object state)
/// <param name="token">
/// If the token is cancelled, the thread stops polling for changes
/// </param>
private async Task CheckForChangesAsync(CancellationToken token)
{
await _checkForChangesLock.WaitAsync();
try
{
if (_state == CheckingForChanges)
try {
while (!token.IsCancellationRequested)
{
// To prevent useless reinvocation of the callback while it's executing
_checkForChangesTimer.Change(Timeout.Infinite, Timeout.Infinite);
await CheckForChangesAsync();
if (_rows.Count > 0)
if (_state == State.CheckingForChanges)
{
// If StopAsync has been called, we don't want to change the state from Stopped.
// Rather, it makes sense to immediately release the leases we acquired and dispose the timers
if (Interlocked.CompareExchange(ref _state, ProcessingChanges, CheckingForChanges) == Stopped)
await CheckForChangesAsync();
if (_rows.Count > 0)
{
await ReleaseLeasesAsync();
// Go to the finally block
return;
}
var triggerValue = new ChangeTableData();
_queryStrings.TryGetValue(SqlTriggerConstants.WhereCheck, out string whereCheck);
triggerValue.WorkerTableRows = _rows;
triggerValue.PrimaryKeys = _primaryKeys.Keys;
triggerValue.WhereCheck = whereCheck;
FunctionResult result = await _executor.TryExecuteAsync(new TriggeredFunctionData() { TriggerValue = triggerValue }, _cancellationTokenSource.Token);
if (result.Succeeded)
{
await ReleaseLeasesAsync();
}
else
{
//Should probably have some count for how many times we tried to execute the function. After a certain amount of times
// we should give up
var triggerValue = new ChangeTableData();
_queryStrings.TryGetValue(SqlTriggerConstants.WhereCheck, out string whereCheck);
triggerValue.WorkerTableRows = _rows;
triggerValue.PrimaryKeys = _primaryKeys.Keys;
triggerValue.WhereCheck = whereCheck;
// Should we cancel executing the function if StopAsync is called, or let it finish?
// In other words, should _cancellationTokenSourceCheckingForChanges and _cancellationTokenSourceExecutor
// be one token source?
FunctionResult result = await _executor.TryExecuteAsync(new TriggeredFunctionData() { TriggerValue = triggerValue },
_cancellationTokenSourceExecutor.Token);
if (result.Succeeded)
{
await ReleaseLeasesAsync();
}
else
{
// Should probably have some count for how many times we tried to execute the function. After a certain amount of times
// we should give up
}
if (token.IsCancellationRequested)
{
// Only want to cancel renewing leases after we finish processing the changes
_cancellationTokenSourceRenewLeases.Cancel();
// Might as well skip delaying the task and immediately break out of the while loop
break;
}
}
await Task.Delay(SqlTriggerConstants.PollingInterval * 1000, token);
}
}
}
catch (Exception e)
{
// have a logger here
}
finally
{
if (_state == Stopped)
{
await _checkForChangesTimer.DisposeAsync();
await _renewLeasesTimer.DisposeAsync();
// Signal that resources have been successfully released and timers stopped
_signalStoppedHandle.Set();
}
else
{
// Re-enable timer
_checkForChangesTimer.Change(0, SqlTriggerConstants.PollingInterval * 1000);
_checkForChangesLock.Release();
}
}
}
}

/// <summary>
Expand Down Expand Up @@ -335,7 +318,7 @@ private async Task RenewLeasesAsync()
private async Task ReleaseLeasesAsync()
{
// Don't want to change the _rows while another thread is attempting to renew leases on them
await _renewLeasesLock.WaitAsync();
await _leasesLock.WaitAsync();
try
{
using (SqlConnection connection = SqlBindingUtilities.BuildConnection(_connectionStringSetting, _configuration))
Expand All @@ -356,14 +339,20 @@ private async Task ReleaseLeasesAsync()
}
_rows.Clear();
}
catch (Exception e)
{
// What should we do if releasing the leases fails? We could try to release them again or just wait,
// since eventually the lease time will expire. Then another thread will re-process the same changes though,
// so less than ideal
}
finally
{
// Want to do this before releasing the lock in case the renew leases timer goes off. It will see that
// Want to do this before releasing the lock in case the renew leases thread wakes up. It will see that
// the state is checking for changes and not renew the (just released) leases
// Only want to change the state if it was previously ProcessingChanges though. If it is stopped, for example,
// we don't want to start polling for changes again
Interlocked.CompareExchange(ref _state, CheckingForChanges, ProcessingChanges);
_renewLeasesLock.Release();
_state = State.CheckingForChanges;
_leasesLock.Release();
}
}

Expand Down Expand Up @@ -512,18 +501,15 @@ private SqlCommand BuildRenewLeaseOnRowCommand(Dictionary<string, string> row, S
private SqlCommand BuildReleaseLeaseOnRowCommand(Dictionary<string, string> row, SqlConnection connection, SqlTransaction transaction)
{
SqlCommand releaseLeaseCommand = new SqlCommand();

_queryStrings.TryGetValue(SqlTriggerConstants.WhereCheck, out string whereCheck);
string versionNumber;
row.TryGetValue("SYS_CHANGE_VERSION", out versionNumber);
row.TryGetValue("SYS_CHANGE_VERSION", out string versionNumber);
SqlBindingUtilities.AddPrimaryKeyParametersToCommand(releaseLeaseCommand, row, _primaryKeys.Keys);

var releaseLeaseOnRow =
$"DECLARE @current_version bigint;\n" +
$"SET @current_version = \n" +
$"(SELECT VersionNumber\n" +
$"SELECT @current_version = VersionNumber\n" +
$"FROM {_workerTable}\n" +
$"WHERE {whereCheck});\n" +
$"WHERE {whereCheck};\n" +
$"IF {versionNumber} >= @current_version\n" +
$"UPDATE {_workerTable}\n" +
$"SET LeaseExpirationTime = NULL, DequeueCount = 0, VersionNumber = {versionNumber}\n" +
Expand Down Expand Up @@ -566,5 +552,11 @@ private static string BuildWorkerTableName(string userTable)

return SqlTriggerConstants.Schema + ".Worker_Table_" + tableName;
}

enum State
{
CheckingForChanges,
ProcessingChanges
}
}
}