Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 88 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ let pusher = PusherBuilder::new_with_client(my_client, "id", "key", "secret").ho

### Triggering events

It is possible to trigger an event on one or more channels. Channel names can contain only characters which are alphanumeric, `_` or `-`` and have to be at most 200 characters long. Event name can be at most 200 characters long too.
It is possible to trigger an event on one or more channels. Channel names can contain only characters which are alphanumeric, `_` or `-`` and have to be at most 200 characters long. Event name can be at most 200 characters long too. It is also possbie to trigger an event to a specific user.


#### Single channel
Expand Down Expand Up @@ -171,13 +171,13 @@ let channels = vec!["test_channel", "test_channel2"];
pusher.trigger_multi(&channels, "my_event", "hello").await;
```

### Excluding event recipients
#### Excluding event recipients

`trigger_exclusive` and `trigger_multi_exclusive` follow the patterns above, except a `socket_id` is given as the last parameter.

These methods allow you to exclude a recipient whose connection has that `socket_id` from receiving the event. You can read more [here](http://pusher.com/docs/duplicates).

#### Examples
###### Examples

**On one channel**:

Expand All @@ -192,7 +192,49 @@ let channels = vec!["test_channel", "test_channel2"];
pusher.trigger_multi_exclusive(&channels, "my_event", "hello", "123.12").await;
```

### Authenticating Channels
#### Send to user

##### `async fn send_to_user<S: serde::Serialize>(&self, user_id: &str, event: &str, payload: S)`

|Argument | Description |
|:-:|:-:|
|user_id `&str`| The id of the user you wish to send an event to.|
|event `&str` | As above.|
|data `S: serde::Serialize` |As above.|

|Return Value|Description|
|:-:|:-:|
|result `Result<TriggeredEvents, String>` | As above. |

###### Example

```rust
let user_id = "10";
pusher.send_to_user(user_id, "my_event", "hello").await;
```

### Terminating user connections

Authenticating a user allows you to terminate all connections for that given user.

##### `async fn terminate_user_connections(&self, user_id: &str)`

|Argument | Description |
|:-:|:-:|
|user_id `&str`| The id of the user whose connections you wish to terminate.|

|Return Value|Description|
|:-:|:-:|
|result `Result<(), String>` | If the request was successful, an `Ok` value will be returned. An `Err` value will be returned if any errors were encountered. |

###### Example

```rust
let user_id = "10";
pusher.terminate_user_connections(user_id).await;
```

### Authentication

Application security is very important so Pusher provides a mechanism for authenticating a user’s access to a channel at the point of subscription.

Expand Down Expand Up @@ -269,6 +311,45 @@ async fn pusher_auth(req: Request<Body>) -> Result<Response<Body>, Error> {
}
```

#### Authenticating users

We can authenticate a user once per connection session. Authenticating a user gives your application access to user based features such as sending events to a user based on user id on terminating a user’s connections immediately.

##### `fn authenticate_user(&self, socket_id: &str, user: &User)`

|Argument|Description|
|:-:|:-:|
|socket_id `&str`| The socket id in the request sent by the client|
|user `&pusher::User`| A struct representing what to assign to a user, consisting of a `id` and any custom `user_info` and `watchlist`. See below |

###### Custom Types

**pusher::User**

```rust
pub struct User<'a> {
pub id: &'a str,
pub user_info: Option<HashMap<&'a str, &'a str>>,
pub watchlist: Option<Vec<&'a str>>,
}
```

###### Example using hyper

```rust
async fn pusher_user_auth(req: Request<Body>) -> Result<Response<Body>, Error> {
let body = to_bytes(req).await.unwrap();
let params = parse(body.as_ref()).into_owned().collect::<HashMap<String, String>>();
let socket_id = params.get("socket_id").unwrap();
let mut user_info = HashMap::new();
user_info.insert("username", "nikhilpatel");
let watchlist = vec!["some-user-id", "some-other-user-id"];
let user = pusher::User {id: "10", user_info: Some(user_info), watchlist: Some(watchlist)};
let auth_signature = pusher.authenticate_user(socket_id, &user).unwrap();
Ok(Response::new(auth_signature.into()))
}
```

### Application state

This library allows you to query our API to retrieve information about your application's channels, their individual properties, and, for presence-channels, the users currently subscribed to them.
Expand Down Expand Up @@ -450,9 +531,12 @@ Feature | Supported
-------------------------------------------| :-------:
Trigger event on single channel | *&#10004;*
Trigger event on multiple channels | *&#10004;*
Trigger event to specific users | *&#10004;*
Excluding recipients from events | *&#10004;*
Authenticating private channels | *&#10004;*
Authenticating presence channels | *&#10004;*
Authenticating users | *&#10004;*
Terminating user connections | *&#10004;*
Get the list of channels in an application | *&#10004;*
Get the state of a single channel | *&#10004;*
Get a list of users in a presence channel | *&#10004;*
Expand Down
149 changes: 147 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,30 @@ impl<C: Connect + Clone + Send + Sync + 'static> Pusher<C> {
.await
}

/// This method allow you to trigger an event to an authenticated user.
///
/// **Example:**
///
/// ```
/// # use pusher::PusherBuilder;
/// # let pusher = PusherBuilder::new("id", "key", "secret").finalize();
/// let user_id = "10";
/// pusher.send_to_user(user_id, "my_event", "hello");
/// ```
pub async fn send_to_user<S: serde::Serialize>(
&self,
user_id: &str,
event: &str,
payload: S,
) -> Result<TriggeredEvents, String> {
if let Err(message) = validate_user_id(user_id) {
return Err(message);
}
let channels = vec![format!("#server-to-user-{}", user_id)];
self._trigger(channels, event, payload, None)
.await
}

async fn _trigger<S: serde::Serialize>(
&self,
channels: Vec<String>,
Expand Down Expand Up @@ -579,6 +603,7 @@ impl<C: Connect + Clone + Send + Sync + 'static> Pusher<C> {
///
/// **Example with hyper**
///
/// ```ignore
/// async fn pusher_auth(req: Request<Body>) -> Result<Response<Body>, Error> {
/// let body = to_bytes(req).await.unwrap();
/// let params = parse(body.as_ref()).into_owned().collect::<HashMap<String, String>>();
Expand Down Expand Up @@ -624,10 +649,102 @@ impl<C: Connect + Clone + Send + Sync + 'static> Pusher<C> {
auth_map.insert("channel_data", json_member);
}

create_channel_auth(&mut auth_map, &self.key, &self.secret, &to_sign);
create_auth_token(&mut auth_map, &self.key, &self.secret, &to_sign);
Ok(serde_json::to_string(&auth_map).unwrap())
}

/// This method allows you to authenticate a user once per connection session.
/// Authenticating a user gives your application access to user based
/// features such as sending events to a user based on user id or terminating
/// a user’s connections immediately.
///
/// **Example with hyper**
///
/// ```ignore
/// async fn pusher_user_auth(req: Request<Body>) -> Result<Response<Body>, Error> {
/// let body = to_bytes(req).await.unwrap();
/// let params = parse(body.as_ref()).into_owned().collect::<HashMap<String, String>>();
/// let socket_id = params.get("socket_id").unwrap();
///
/// let mut user_data = HashMap::new();
/// user_data.insert("username".to_string(), "nikhilpatel".to_string());
/// user_data.insert("group".to_string(), "the-cool-one".to_string());
/// let watchlist = vec!["some-user-id", "some-other-user-id"];
/// let user = pusher::User {
/// id: "10",
/// user_info: Some(user_data),
/// watchlist: Some(watchlist)
/// };
///
/// let auth_signature = pusher.authenticate_user(socket_id, &user).unwrap();
/// Ok(Response::new(auth_signature.into()))
/// }
/// ```
pub fn authenticate_user(
&self,
socket_id: &str,
user: &User,
) -> Result<String, &str> {
let socket_id_regex = Regex::new(r"\A\d+\.\d+\z").unwrap(); // how to make this global?

if !socket_id_regex.is_match(&socket_id) {
return Err("Invalid socket_id");
}

let json_user = serde_json::to_string(user).unwrap();

let to_sign = format!("{}:user:{}", socket_id, json_user);

let mut auth_map = HashMap::new();
auth_map.insert("user_data", json_user);

create_auth_token(&mut auth_map, &self.key, &self.secret, &to_sign);
Ok(serde_json::to_string(&auth_map).unwrap())
}


/// This method allows you to terminate all connections for an authenticated user.
///
/// **Example:**
///
/// ```ignore
/// # use pusher::PusherBuilder;
/// # let pusher = PusherBuilder::new("id", "key", "secret").finalize();
/// let user_id = "10";
/// pusher.terminate_user_connections(user_id);
/// ```
pub async fn terminate_user_connections(
&self,
user_id: &str
) -> Result<(), String> {
if let Err(message) = validate_user_id(user_id) {
return Err(message);
}

let request_url_string = format!(
"{}://{}/users/{}/terminate_connections",
self.scheme(),
self.host,
user_id,
);
let mut request_url = Url::parse(&request_url_string).unwrap();

let body = "".to_string();

let method = "POST";
let query = build_query(
method,
request_url.path(),
&self.key,
&self.secret,
timestamp(),
Some(&body),
None,
);
request_url.set_query(Some(&query));
send_request::<C, ()>(&self.http_client, method, request_url, Some(body)).await
}

/// On your dashboard at http://app.pusher.com, you can set up webhooks to POST a
/// payload to your server after certain events. Such events include channels being
/// occupied or vacated, members being added or removed in presence-channels, or
Expand Down Expand Up @@ -679,10 +796,11 @@ mod tests {
fn test_presence_channel_authentication() {
let pusher =
PusherBuilder::new("id", "278d425bdf160c739803", "7ad3773142a6692b25b8").finalize();
let expected = "{\"auth\":\"278d425bdf160c739803:48dac51d2d7569e1e9c0f48c227d4b26f238fa68e5c0bb04222c966909c4f7c4\",\"channel_data\":\"{\\\"user_id\\\":\\\"10\\\",\\\"user_info\\\":{\\\"name\\\":\\\"Mr. Pusher\\\"}}\"}";
let expected = "{\"auth\":\"278d425bdf160c739803:57a64aa30b116d4d495d6bb56bf187698a3298c3d4959770ffd38cb05bc504fc\",\"channel_data\":\"{\\\"user_id\\\":\\\"10\\\",\\\"user_info\\\":{\\\"clan\\\":\\\"Vikings\\\",\\\"name\\\":\\\"Mr. Pusher\\\"}}\"}";
let expected_encoded: HashMap<String, String> = serde_json::from_str(expected).unwrap();
let mut member_data = HashMap::new();
member_data.insert("name", "Mr. Pusher");
member_data.insert("clan", "Vikings");
let presence_data = Member {
user_id: "10",
user_info: Some(member_data),
Expand All @@ -699,6 +817,33 @@ mod tests {
);
}

#[test]
fn test_user_authentication() {
let pusher =
PusherBuilder::new("id", "278d425bdf160c739803", "7ad3773142a6692b25b8").finalize();
let expected = "{\"auth\":\"278d425bdf160c739803:2a475eafe42c10a641c2ae25156e14d68de2e39135f82fe27cb01c8926af22f8\",\"user_data\":\"{\\\"id\\\":\\\"10\\\",\\\"user_info\\\":{\\\"age\\\":\\\"101\\\",\\\"name\\\":\\\"Mr. Pusher\\\"},\\\"watchlist\\\":[\\\"43\\\",\\\"513\\\",\\\"12\\\"]}\"}";
let expected_encoded: HashMap<String, String> = serde_json::from_str(expected).unwrap();
let mut user_info = HashMap::new();
user_info.insert("name", "Mr. Pusher");
user_info.insert("age", "101");
let watchlist = vec!["43", "513", "12"];
let user = User {
id: "10",
user_info: Some(user_info),
watchlist: Some(watchlist),
};
let result_json =
pusher.authenticate_user("1234.1234", &user);
let result_decoded: HashMap<String, String> =
serde_json::from_str(&result_json.unwrap()).unwrap();

assert_eq!(result_decoded["auth"], expected_encoded["auth"]);
assert_eq!(
result_decoded["user_data"],
expected_encoded["user_data"]
);
}

#[test]
fn test_socket_id_validation() {
let pusher =
Expand Down
18 changes: 18 additions & 0 deletions src/json_structures.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

use crate::util::serde_utils::optional_sorted_map;

#[derive(Serialize)]
pub struct TriggerEventData {
pub name: String,
Expand Down Expand Up @@ -34,7 +36,23 @@ pub struct Member<'a> {
/// Supply an id of the member
pub user_id: &'a str,
/// Supply any optional information to be associated with the member
#[serde(serialize_with = "optional_sorted_map", skip_serializing_if = "Option::is_none")]
pub user_info: Option<HashMap<&'a str, &'a str>>,
}


/// When authenticating user, this represents a particular user.
/// This object becomes associated with that user's subscription.
#[derive(Serialize)]
pub struct User<'a> {
/// Supply an id of the user
pub id: &'a str,
/// Supply any optional information to be associated with the user
#[serde(serialize_with = "optional_sorted_map", skip_serializing_if = "Option::is_none")]
pub user_info: Option<HashMap<&'a str, &'a str>>,
/// Supply optional list of user IDs to allow viewing presence information
#[serde(skip_serializing_if = "Option::is_none")]
pub watchlist: Option<Vec<&'a str>>,
}

/// This is returned upon validating that a webhook is indeed from Pusher,
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ mod util;

pub use self::client::{Pusher, PusherBuilder};
pub use self::json_structures::{
Channel, ChannelList, ChannelUser, ChannelUserList, Member, QueryParameters, TriggeredEvents,
Channel, ChannelList, ChannelUser, ChannelUserList, Member, QueryParameters, TriggeredEvents, User,
Webhook,
};
2 changes: 1 addition & 1 deletion src/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub fn create_body_md5(body: &str) -> String {
result.encode_hex()
}

pub fn create_channel_auth<'a>(
pub fn create_auth_token<'a>(
auth_map: &mut HashMap<&'a str, String>,
key: &str,
secret: &str,
Expand Down
Loading