Skip to content

Commit

Permalink
Encapsulate cloud table together with partition
Browse files Browse the repository at this point in the history
  • Loading branch information
yevhen committed May 29, 2015
1 parent fd26c73 commit d66f3f5
Show file tree
Hide file tree
Showing 23 changed files with 261 additions and 250 deletions.
2 changes: 1 addition & 1 deletion Source/Example/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static void Main()
Console.WriteLine("{0}", scenario.GetType().Name.Replace("_", " "));
Console.WriteLine(new string('-', 40));

scenario.Initialize(table, i);
scenario.Initialize(table, i.ToString());
scenario.Run();

Console.WriteLine();
Expand Down
9 changes: 6 additions & 3 deletions Source/Example/Scenario.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
using System;
using System.Linq;

using Streamstone;
using Microsoft.WindowsAzure.Storage.Table;

namespace Example
{
public abstract class Scenario
{
protected string Id;
protected CloudTable Table;
protected string Partition;
protected Partition Partition;

public void Initialize(CloudTable table, int partition)
public void Initialize(CloudTable table, string id)
{
Id = id;
Table = table;
Partition = partition.ToString();
Partition = new Partition(table, id);
}

public abstract void Run();
Expand Down
4 changes: 2 additions & 2 deletions Source/Example/Scenarios/S01_Provision_new_stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ public class S01_Provision_new_stream : Scenario
{
public override void Run()
{
var stream = Stream.Provision(Table, Partition);
var stream = Stream.Provision(Partition);

Console.WriteLine("Provisioned new empty stream in partition '{0}'", stream.Partition);
Console.WriteLine("Etag: {0}", stream.ETag);
Console.WriteLine("Version: {0}", stream.Version);

var exists = Stream.Exists(Table, Partition);
var exists = Stream.Exists(Partition);
Console.WriteLine("Checking stream exists in a storage: {0}", exists);
}
}
Expand Down
6 changes: 3 additions & 3 deletions Source/Example/Scenarios/S02_Open_stream_for_writing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ void OpenNonExistingStream()
{
try
{
Stream.Open(Table, Partition);
Stream.Open(Partition);
}
catch (StreamNotFoundException)
{
Expand All @@ -27,9 +27,9 @@ void OpenNonExistingStream()

void OpenExistingStream()
{
Stream.Provision(Table, Partition);
Stream.Provision(Partition);

var stream = Stream.Open(Table, Partition);
var stream = Stream.Open(Partition);

Console.WriteLine("Opened existing (empty) stream in partition '{0}'", stream.Partition);
Console.WriteLine("Etag: {0}", stream.ETag);
Expand Down
6 changes: 3 additions & 3 deletions Source/Example/Scenarios/S03_Try_open_stream_for_writing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ public override void Run()

void TryOpenNonExistentStream()
{
var existent = Stream.TryOpen(Table, Partition);
var existent = Stream.TryOpen(Partition);

Console.WriteLine("Trying to open non-existent stream. Found: {0}, Stream: {1}",
existent.Found, existent.Stream == null ? "<null>" : "?!?");
}

void TryOpenExistentStream()
{
Stream.Provision(Table, Partition);
Stream.Provision(Partition);

var existent = Stream.TryOpen(Table, Partition);
var existent = Stream.TryOpen(Partition);

Console.WriteLine("Trying to open existent stream. Found: {0}, Stream: {1}\r\nEtag - {2}, Version - {3}",
existent.Found, existent.Stream, existent.Stream.ETag, existent.Stream.Version);
Expand Down
14 changes: 7 additions & 7 deletions Source/Example/Scenarios/S04_Write_to_stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ public override void Run()

void WriteToExistingOrCreateNewStream()
{
var existent = Stream.TryOpen(Table, Partition);
var existent = Stream.TryOpen(Partition);

var stream = existent.Found
? existent.Stream
: new Stream(Partition);

Console.WriteLine("Writing to new stream in partition '{0}'", stream.Partition);

var result = Stream.Write(Table, stream, new[]
var result = Stream.Write(stream, new[]
{
Event(new InventoryItemCreated(Partition, "iPhone6")),
Event(new InventoryItemCheckedIn(Partition, 100)),
Event(new InventoryItemCreated(Id, "iPhone6")),
Event(new InventoryItemCheckedIn(Id, 100)),
});

Console.WriteLine("Succesfully written to new stream.\r\nEtag: {0}, Version: {1}",
Expand All @@ -38,16 +38,16 @@ void WriteToExistingOrCreateNewStream()

void WriteSequentiallyToExistingStream()
{
var stream = Stream.Open(Table, Partition);
var stream = Stream.Open(Partition);

Console.WriteLine("Writing sequentially to existing stream in partition '{0}'", stream.Partition);
Console.WriteLine("Etag: {0}, Version: {1}", stream.ETag, stream.Version);

for (int i = 1; i <= 10; i++)
{
var result = Stream.Write(Table, stream, new[]
var result = Stream.Write(stream, new[]
{
Event(new InventoryItemCheckedIn(Partition, i*100)),
Event(new InventoryItemCheckedIn(Id, i*100)),
});

Console.WriteLine("Succesfully written event '{0}' under version '{1}'",
Expand Down
6 changes: 3 additions & 3 deletions Source/Example/Scenarios/S05_Read_from_stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ void Prepare()
.Select(Event)
.ToArray();

Stream.Write(Table, new Stream(Partition), events);
Stream.Write(new Stream(Partition), events);
}

void ReadSlice()
{
Console.WriteLine("Reading single slice from specified start version and using specified slice size");

var slice = Stream.Read<EventEntity>(Table, Partition, startVersion: 2, sliceSize: 2);
var slice = Stream.Read<EventEntity>(Partition, startVersion: 2, sliceSize: 2);
foreach (var @event in slice.Events)
Console.WriteLine("{0}: {1}-{2}", @event.Version, @event.Type, @event.Data);

Expand All @@ -46,7 +46,7 @@ void ReadAll()

do
{
slice = Stream.Read<EventEntity>(Table, Partition, nextSliceStart, sliceSize: 1);
slice = Stream.Read<EventEntity>(Partition, nextSliceStart, sliceSize: 1);

foreach (var @event in slice.Events)
Console.WriteLine("{0}:{1} {2}-{3}", @event.Id, @event.Version, @event.Type, @event.Data);
Expand Down
17 changes: 8 additions & 9 deletions Source/Example/Scenarios/S06_Include_additional_entities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,18 @@ public class S06_Include_additional_entities : Scenario
{
public override void Run()
{
var id = Partition;
var stream = new Stream(id);
var stream = new Stream(Partition);

Console.WriteLine("Writing to new stream along with making snapshot in partition '{0}'",
stream.Partition);
stream.Partition);

var events = new[]
{
Event(new InventoryItemCreated(id, "iPhone6")),
Event(new InventoryItemCheckedIn(id, 100)),
Event(new InventoryItemCheckedOut(id, 50)),
Event(new InventoryItemRenamed(id, "iPhone6", "iPhone7")),
Event(new InventoryItemCheckedOut(id, 40))
Event(new InventoryItemCreated(Id, "iPhone6")),
Event(new InventoryItemCheckedIn(Id, 100)),
Event(new InventoryItemCheckedOut(Id, 50)),
Event(new InventoryItemRenamed(Id, "iPhone6", "iPhone7")),
Event(new InventoryItemCheckedOut(Id, 40))
};

var snapshot = Include.InsertOrReplace(new InventoryItemShapshot
Expand All @@ -35,7 +34,7 @@ public override void Run()
Version = events.Length
});

var result = Stream.Write(Table, stream, events, new[]{snapshot});
var result = Stream.Write(stream, events, new[]{snapshot});

Console.WriteLine("Succesfully written to new stream.\r\nEtag: {0}, Version: {1}",
result.Stream.ETag, result.Stream.Version);
Expand Down
26 changes: 13 additions & 13 deletions Source/Example/Scenarios/S07_Custom_stream_metadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ public class S07_Custom_stream_metadata : Scenario
{
public override void Run()
{
SpecifyingDuringProvisioning();
SpecifyingForExistingStream();
SpecifyingDuringWritingToNewStream();
UpdatingForExistingStream();
}

void SpecifyingDuringProvisioning()
void SpecifyingForExistingStream()
{
var partition = Partition + ".a";
var partition = new Partition(Table, Id + ".a");

var properties = new Dictionary<string, EntityProperty>
{
Expand All @@ -27,17 +27,17 @@ void SpecifyingDuringProvisioning()
};

var stream = new Stream(partition, properties);
Stream.Provision(Table, stream);
Stream.Provision(stream);

Console.WriteLine("Stream metadata specified during provisioning in partition '{0}'", partition);

stream = Stream.Open(Table, partition);
stream = Stream.Open(partition);
Print(stream.Properties);
}

void SpecifyingDuringWritingToNewStream()
{
var partition = Partition + ".b";
var partition = new Partition(Table, Id + ".b");

var properties = new Dictionary<string, EntityProperty>
{
Expand All @@ -46,17 +46,17 @@ void SpecifyingDuringWritingToNewStream()
};

var stream = new Stream(partition, properties);
Stream.Write(Table, stream, new[]{new Event("42")});
Stream.Write(stream, new[]{new Event("42")});

Console.WriteLine("Stream metadata specified during writing to new stream in partition '{0}'", partition);

stream = Stream.Open(Table, partition);
stream = Stream.Open(partition);
Print(stream.Properties);
}

void UpdatingForExistingStream()
{
var partition = Partition + ".c";
var partition = new Partition(Table, Id + ".c");

var properties = new Dictionary<string, EntityProperty>
{
Expand All @@ -65,19 +65,19 @@ void UpdatingForExistingStream()
};

var stream = new Stream(partition, properties);
Stream.Provision(Table, stream);
Stream.Provision(stream);

Console.WriteLine("Stream metadata specified for stream in partition '{0}'", partition);

stream = Stream.Open(Table, partition);
stream = Stream.Open(partition);
Print(stream.Properties);

properties["Active"] = new EntityProperty(false);
Stream.SetProperties(Table, stream, properties);
Stream.SetProperties(stream, properties);

Console.WriteLine("Updated stream metadata in partition '{0}'", partition);

stream = Stream.Open(Table, partition);
stream = Stream.Open(partition);
Print(stream.Properties);
}

Expand Down
26 changes: 13 additions & 13 deletions Source/Example/Scenarios/S08_Concurrency_conflicts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ public override void Run()

void SimultaneousProvisioning()
{
Stream.Provision(Table, Partition);
Stream.Provision(Partition);

try
{
Stream.Provision(Table, Partition);
Stream.Provision(Partition);
}
catch (ConcurrencyConflictException)
{
Expand All @@ -33,14 +33,14 @@ void SimultaneousProvisioning()

void SimultaneousWriting()
{
var a = Stream.Open(Table, Partition);
var b = Stream.Open(Table, Partition);
var a = Stream.Open(Partition);
var b = Stream.Open(Partition);

Stream.Write(Table, a, new[]{new Event("123")});
Stream.Write(a, new[]{new Event("123")});

try
{
Stream.Write(Table, b, new[]{new Event("456")});
Stream.Write(b, new[]{new Event("456")});
}
catch (ConcurrencyConflictException)
{
Expand All @@ -50,15 +50,15 @@ void SimultaneousWriting()

void SimultaneousSettingOfStreamMetadata()
{
var a = Stream.Open(Table, Partition);
var b = Stream.Open(Table, Partition);
var a = Stream.Open(Partition);
var b = Stream.Open(Partition);

Stream.SetProperties(Table, a,
Stream.SetProperties(a,
new Dictionary<string, EntityProperty>{{"A", new EntityProperty("42")}});

try
{
Stream.SetProperties(Table, b,
Stream.SetProperties(b,
new Dictionary<string, EntityProperty> {{"A", new EntityProperty("56")}});
}
catch (ConcurrencyConflictException)
Expand All @@ -69,17 +69,17 @@ void SimultaneousSettingOfStreamMetadata()

void SequentiallyWritingToStreamIgnoringReturnedStreamHeader()
{
var stream = Stream.Open(Table, Partition);
var stream = Stream.Open(Partition);

var result = Stream.Write(Table, stream, new[]{new Event("AAA")});
var result = Stream.Write(stream, new[]{new Event("AAA")});

// a new stream header is returned after each write, it contains new Etag
// and it should be used for subsequent operations
// stream = result.Stream;

try
{
Stream.Write(Table, stream, new[]{new Event("BBB")});
Stream.Write(stream, new[]{new Event("BBB")});
}
catch (ConcurrencyConflictException)
{
Expand Down
4 changes: 2 additions & 2 deletions Source/Example/Scenarios/S09_Handling_duplicates.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class S09_Handling_duplicates : Scenario
{
public override void Run()
{
var result = Stream.Write(Table, new Stream(Partition), new[]{new Event(id: "42")});
var result = Stream.Write(new Stream(Partition), new[]{new Event(id: "42")});

try
{
Expand All @@ -19,7 +19,7 @@ public override void Run()
new Event(id: "42") // conflicting (duplicate) event
};

Stream.Write(Table, result.Stream, events);
Stream.Write(result.Stream, events);
}
catch (DuplicateEventException e)
{
Expand Down
Loading

0 comments on commit d66f3f5

Please sign in to comment.