KQL
Kusto Query Language patterns for threat hunting and log analysis in Microsoft Sentinel.
KQL (Kusto Query Language)
KQL is the query language for Azure Log Analytics, Microsoft Sentinel, and Microsoft Defender. It reads top-to-bottom like a pipeline — each operator transforms the result of the previous one.
Core Operators
where — filter rows by condition
SecurityEvent
| where TimeGenerated > ago(1h)
| where EventID == 4625
| where Account !has "SYSTEM"
project — select and rename columns (like SQL SELECT)
SigninLogs
| where TimeGenerated > ago(24h)
| project TimeGenerated, User = UserPrincipalName, IP = IPAddress, Result = ResultType
extend — add calculated columns without removing existing ones
SecurityEvent
| where EventID == 4625
| extend HourOfDay = hourofday(TimeGenerated)
| extend IsWeekend = dayofweek(TimeGenerated) > 5d
summarize — aggregate rows (GROUP BY equivalent)
SecurityEvent
| where EventID == 4625
| summarize FailCount = count() by Account, bin(TimeGenerated, 1h)
| order by FailCount desc
join — combine two tables on a common key
// Correlate failed logins followed by success from same IP
let failures = SecurityEvent | where EventID == 4625 | project FailTime = TimeGenerated, IpAddress, Account;
let successes = SecurityEvent | where EventID == 4624 | project SuccessTime = TimeGenerated, IpAddress, Account;
failures
| join kind=inner successes on IpAddress, Account
| where SuccessTime between (FailTime .. FailTime + 5m)
render — visualize results as charts
SecurityEvent
| where EventID == 4625
| summarize count() by bin(TimeGenerated, 1h)
| render timechart
Time Functions
ago() — relative time from now
| where TimeGenerated > ago(30m) // last 30 minutes
| where TimeGenerated > ago(7d) // last 7 days
| where TimeGenerated > ago(1h) // last hour
between — time range filter
| where TimeGenerated between (datetime(2026-04-09 08:00) .. datetime(2026-04-09 17:00))
bin() — bucket timestamps for aggregation
| summarize count() by bin(TimeGenerated, 5m) // 5-minute buckets
| summarize count() by bin(TimeGenerated, 1h) // hourly buckets
| summarize count() by bin(TimeGenerated, 1d) // daily buckets
format_datetime() — control timestamp display
| extend ReadableTime = format_datetime(TimeGenerated, "yyyy-MM-dd HH:mm:ss")
Aggregation Functions
count, dcount, countif — counting variations
SecurityEvent
| summarize
TotalEvents = count(),
UniqueIPs = dcount(IpAddress),
FailedLogins = countif(EventID == 4625),
SuccessfulLogins = countif(EventID == 4624)
make_set, make_list — collect distinct values or all values into arrays
SigninLogs
| summarize
Locations = make_set(Location),
Apps = make_set(AppDisplayName)
by UserPrincipalName
| where array_length(Locations) > 2
arg_max, arg_min — get the row with the max/min value
// Latest event per computer
SecurityEvent
| summarize arg_max(TimeGenerated, *) by Computer
percentile — statistical distribution
Perf
| summarize p50 = percentile(CounterValue, 50),
p95 = percentile(CounterValue, 95),
p99 = percentile(CounterValue, 99)
by Computer
String Operations
has vs contains vs == — choose the right string operator
// has: word boundary match (fast, tokenized) — preferred
| where Account has "admin"
// contains: substring match (slower, full scan)
| where CommandLine contains "mimikatz"
// ==: exact match (fastest when you know the full value)
| where EventID == 4625
// has_any: match any in a list
| where Process has_any ("powershell.exe", "cmd.exe", "wscript.exe")
// !has: negation
| where Account !has "SYSTEM"
extract — regex field extraction
| extend Domain = extract(@"(\w+)\\(\w+)", 1, Account)
| extend Username = extract(@"(\w+)\\(\w+)", 2, Account)
parse — structured field extraction from strings
// Parse "User evan logged in from 10.50.1.100"
| parse Message with "User " ParsedUser " logged in from " ParsedIP
split — break a string into an array
| extend PathParts = split(FilePath, "\\")
| extend FileName = PathParts[-1]
mv-expand — Explode Arrays into Rows
Expand multi-valued fields for per-element analysis
SigninLogs
| mv-expand ConditionalAccessPolicies
| extend PolicyName = tostring(ConditionalAccessPolicies.displayName)
| extend PolicyResult = tostring(ConditionalAccessPolicies.result)
| where PolicyResult == "failure"
| summarize count() by PolicyName
Common Security Queries
Failed logins by country — identify geographic anomalies
SigninLogs
| where ResultType != "0"
| extend Country = tostring(LocationDetails.countryOrRegion)
| summarize Failures = count() by Country
| order by Failures desc
| take 10
Password spray detection — many accounts, few attempts each, same source
SigninLogs
| where ResultType == "50126" // Invalid username or password
| summarize
TargetCount = dcount(UserPrincipalName),
AttemptCount = count()
by IPAddress, bin(TimeGenerated, 10m)
| where TargetCount > 10 and AttemptCount > 20
Rare process execution — find outliers on a host
SecurityEvent
| where EventID == 4688
| summarize ExecutionCount = count() by Process
| where ExecutionCount == 1
| order by Process asc
Service account logon anomaly — service accounts should not use interactive logon
SecurityEvent
| where EventID == 4624
| where Account has "svc_"
| where LogonType in (2, 10, 11) // Interactive, RemoteInteractive, CachedInteractive
| project TimeGenerated, Computer, Account, LogonType, IpAddress
MFA gap analysis — users authenticating without MFA
SigninLogs
| where ResultType == "0"
| where AuthenticationRequirement == "singleFactorAuthentication"
| summarize count() by UserPrincipalName
| order by count_ desc
| take 20
Let Statements and Variables
Define reusable variables and sub-queries
let timeRange = 24h;
let threshold = 5;
let suspiciousIPs = SigninLogs
| where ResultType != "0"
| summarize Failures = count() by IPAddress
| where Failures > threshold
| project IPAddress;
SigninLogs
| where TimeGenerated > ago(timeRange)
| where IPAddress in (suspiciousIPs)
| where ResultType == "0" // Successful login from previously failing IP
| project TimeGenerated, UserPrincipalName, IPAddress, AppDisplayName
Query Performance Tips
1. Filter early: put 'where' clauses before 'summarize' and 'join' 2. Use 'has' over 'contains' — 'has' uses the term index, 'contains' does full scan 3. Limit time range — ago(1h) is 24x faster than ago(24h) 4. Avoid 'search *' — always specify a table name 5. Use 'project' to drop columns you don't need before joins 6. Use 'take' during development, remove for production rules