Stream transforms in Coral8 via .Net
In the first post on integrating PowerShell and Coral8 I showed how to create a message sink. In C#ish pseudocode we did the following:
OutputStream<T> => Action<T>
where the Action<T> was a PowerShell block to send a message via GMail.
Today the goal is to do a transform, something along the following
OutputStream<T> => Func<T,T2> => InputStream<T2>
In this example, T will be a pair of stock symbols, T2 will be a pair stock symbols along with the correlation of their log normal closing prices for the past 10 days.
let’s get started.
Coral8CREATE SCHEMA StockPairSchema( SecA STRING,SecB STRING ); CREATE SCHEMA StockPairCorrelationSchema INHERITS from StockPairSchema (CORR FLOAT) ; CREATE OUTPUT STREAM StockPairs SCHEMA StockPairSchema; CREATE INPUT STREAM StockCorrelations SCHEMA StockPairCorrelationSchema; ATTACH OUTPUT ADAPTER PairstoCorrelationsFunc TYPE PoShAdapter TO STREAM StockPairs PROPERTIES RESULTSSTREAM = "ccl://localhost:6789/Stream/Default/TestC8/StockCorrelations",
The process block is very simple:
foreach ($t in $input) { $a = Get-LogReturns $t["SecA"] $b = Get-LogReturns $t["SecB"] $c = [Demo.Stats]::Correlate($a, $b) ,,($t["SecA"],$t["SecB"],$c) #// double commas so the values are not flattened }
The block path defines the Get-LogReturns and Correlate function, so naturally it’s a bit longer.
Get-LogReturns:
$wc = New-Object Net.WebClient function Get-LogReturns ($sec) { $qry = "http://ichart.finance.yahoo.com/table.csv?s=$sec" $secAData = ConvertFrom-Csv $wc.DownloadString($qry) | select -First 10 | % { $_.'Adj Close' } for ($i=0; $i -lt $secAData.Count-1; $i++) { [Math]::Log( $secAData[$i] / $secAData[$i+1] ) } }
[Demo.Stats]::Correlate
$csCode = @" using System; using System.Collections.Generic; using System.Linq; namespace Demo { public static class Stats { private static IEnumerable<TResult> Zip<TFirst, TSecond, TResult>(IList<TFirst> first, IList<TSecond> second, Func<TFirst, TSecond, TResult> func) { for (int ii = 0; ii < Math.Min(first.Count(),second.Count()); ii++) yield return func(first[ii],second[ii]); } public static double Correlate(object[] s1,object[] s2) { return Correlate(s1.Cast<double>(), s2.Cast<double>()); } public static double Correlate(IEnumerable<double> s1,IEnumerable<double> s2) { var sum1 = s1.Sum(); var sum2 = s2.Sum(); var sumSq1 = s1.Select(v=> v*v).Sum(); var sumSq2 = s2.Select(v=> v*v).Sum(); var pSum = Zip(s1.ToList(),s2.ToList(), ( a, b) => a*b).Sum(); var len = s1.Count(); var num = pSum - ((sum1 * sum2) / len); var denom = Math.Sqrt(((sumSq1 - (sum1 * sum1) / len) * (sumSq2 - (sum2 * sum2) / len))); return (denom == 0.0) ? 0 : num / denom; } } } "@ Add-Type -TypeDefinition $csCode -Language CSharpVersion3 -PassThru
And in case you missed it, the correlate function isn't in PowerShell at all, but rather coded up via in-line C# code, compiled at the startup of the adaptor, and running in the Coral8 server process.