Notification Rule Expression Policies
Notification rules with bound expression policies can match event context, update related objects, and send custom messages. The following examples show common event-driven automation patterns.
Send a welcome email to newly created users
This expression policy sends a welcome email to a user when their account is created through an enrollment flow, the Admin interface, the API, or a source sync. Instead of creating an administrator notification, it uses ak_send_email to email the new user directly.
Prerequisites
authentik must be able to send email before you can use ak_send_email. Either configure the global email settings, or set up an Email stage and pass it to ak_send_email with the stage argument to use that stage's connection settings.
Create the expression policy
- Log in as an administrator, open the authentik Admin interface, and navigate to Customization > Policies.
- Click Create, select Expression Policy, and click Next.
- Give the policy a name, such as
send-welcome-email. - Paste the expression below into the Expression field.
- Click Finish.
from authentik.core.models import User
event = request.context.get("event")
if not event:
ak_logger.info("no event")
return False
if event.action != "model_created":
ak_logger.info("event action does not match")
return False
model = event.context.get("model", {})
model_app = model.get("app")
model_name = model.get("model_name")
if model_app != "authentik_core" or model_name != "user":
ak_logger.info("model does not match")
return False
user_pk = model.get("pk")
user = User.objects.filter(pk=user_pk).first()
if not user:
ak_logger.info("user not found")
return False
if not user.email:
ak_logger.info("user has no email address")
return False
login_url = "https://authentik.company"
ak_send_email(
user.email,
"Welcome to authentik",
body=(
f"Hello {user.name},\n\n"
f"Your authentik account has been created. You can log in at {login_url}."
),
)
# Return False so the notification rule does not create a notification object.
return False
Bind the policy to a notification rule
The policy only runs when it is bound to a notification rule that triggers on the relevant events.
- Navigate to Event > Notification Rules.
- Click New Notification Rule.
- Give the rule a name, such as
welcome-email. - Leave Send notification to event user disabled, leave Destination group empty, and do not select a notification transport.
- Click Create Notification Rule.
- In the list of notification rules, click the arrow next to the new rule to expand the rule details.
- Click Create or bind.... Under Bind Existing..., select Bind an existing policy.
- In the Create Binding modal, select the
send-welcome-emailpolicy, and then click Create.
This pattern does not require a destination group or transport on the rule. As noted in Notification Rules, bound policies are executed even when no destination is selected. Because this policy returns False, authentik does not create a notification object. The email is sent entirely by the ak_send_email call inside the policy.
Matching on model_created ensures that the email is only sent when the account is first created. Other changes to the user, such as a password reset, are recorded as model_updated and do not trigger this policy.
To send a richer, branded message instead of plain text, replace the body argument with a template argument pointing to a custom email template, and pass any values the template needs through the context argument. See the ak_send_email reference for all available parameters.
To let the new user set their own password, point login_url at your recovery flow, for example https://authentik.company/if/flow/default-recovery-flow/. Replace default-recovery-flow with the slug of the flow you want to use. The user can then enter their email address and receive a password setup link. The flow must exist for the URL to work.
For the built-in ways to send a set-password link, see User credentials recovery.
Trigger alert when user logs in from unknown device
This expression policy matches login events from an unknown device. Use it with a notification rule to alert administrators, or enable Send notification to event user on the rule to alert the user who logged in.
event = request.context.get("event")
if not event:
ak_logger.info("no event")
return False
if event.action != "login":
ak_logger.info("event action does not match")
return False
known_device = event.context.get("auth_method_args", {}).get("known_device")
if known_device is False:
ak_logger.info("user logged in from unknown device")
return True
return False
Change user attributes upon account deactivation
This expression policy runs when a user account with the sshPublicKey attribute is deactivated. It moves the value to inactiveSSHPublicKey, clears sshPublicKey, and saves the updated user attributes.
from authentik.core.models import User
event = request.context.get("event")
if not event:
ak_logger.info("no event")
return False
if event.action != "model_updated":
ak_logger.info("event action does not match")
return False
model = event.context.get("model", {})
model_app = model.get("app")
model_name = model.get("model_name")
if model_app != "authentik_core" or model_name != "user":
ak_logger.info("model does not match")
return False
user_pk = model.get("pk")
user = User.objects.filter(pk=user_pk).first()
if not user:
ak_logger.info("user not found")
return False
if user.is_active:
ak_logger.info("user is active, not changing")
return False
if not user.attributes.get("sshPublicKey"):
ak_logger.info("no public keys to remove")
return False
user.attributes["inactiveSSHPublicKey"] = user.attributes["sshPublicKey"]
user.attributes["sshPublicKey"] = []
user.save(update_fields=["attributes"])
return False
Alert when application is created without binding
This expression policy matches newly created applications that do not have a user, group, or policy binding. Use it with a notification rule to alert an administrator, because an application without bindings can be accessed by all users by default.
from authentik.core.models import Application
from authentik.policies.models import PolicyBinding
event = request.context.get("event")
if not event:
ak_logger.info("no event")
return False
if event.action != "model_created":
ak_logger.info("event action does not match")
return False
model = event.context.get("model", {})
model_app = model.get("app")
model_name = model.get("model_name")
if model_app != "authentik_core" or model_name != "application":
ak_logger.info("model does not match")
return False
application_pk = model.get("pk")
application = Application.objects.filter(pk=application_pk).first()
if not application:
ak_logger.info("application not found")
return False
if PolicyBinding.objects.filter(target=application).exists():
ak_logger.info("application has bindings")
return False
ak_logger.info("application has no bindings")
return True
Append user addition history to group attributes
This expression policy runs when users are added to a group. It appends a timestamped entry to the group's UserAddedHistory attribute for each added user. This information is already available in the group's changelog, but the code can be used as a template for alerts or other event-driven updates.
This policy interacts with the diff event output. This field is only available with an enterprise license.
from django.utils import timezone
from authentik.core.models import Group, User
event = request.context.get("event")
if not event:
ak_logger.info("no event")
return False
if event.action != "model_updated":
ak_logger.info("event action does not match")
return False
model = event.context.get("model", {})
model_app = model.get("app")
model_name = model.get("model_name")
if model_app != "authentik_core" or model_name != "group":
ak_logger.info("model does not match")
return False
group_pk = model.get("pk")
group = Group.objects.filter(pk=group_pk).first()
if not group:
ak_logger.info("group not found")
return False
added_user_pks = event.context.get("diff", {}).get("users", {}).get("add", [])
if not added_user_pks:
ak_logger.info("user not added to group")
return False
if not group.attributes.get("UserAddedHistory"):
group.attributes["UserAddedHistory"] = []
current_date_time = timezone.now().isoformat(timespec="seconds")
users = User.objects.filter(pk__in=added_user_pks)
for user in users:
group.attributes["UserAddedHistory"].append(
f"{current_date_time} - Added user: {user.username}"
)
group.save(update_fields=["attributes"])
return False