Switch reader implementation to work queues
authorReto Buerki <reet@codelabs.ch>
Wed, 20 May 2015 07:56:33 +0000 (09:56 +0200)
committerReto Buerki <reet@codelabs.ch>
Thu, 11 Jun 2015 07:11:37 +0000 (09:11 +0200)
Instead of creating an explicit kernel thread, use kernel-global work
queue to schedule reader work. This is in preparation of switching the
driver from polling to interrupt mode.

debug.c
internal.h
net.c
reader.c

diff --git a/debug.c b/debug.c
index e28717f..ccde0f7 100644 (file)
--- a/debug.c
+++ b/debug.c
@@ -193,8 +193,6 @@ static int debug_info_open(struct inode *inode, struct file *file)
                buffer_append(&buffer, "reader is enabled\n");
                buffer_append(&buffer, "reader.element_size: %zu\n",
                              dev_info->reader_element_size);
-               buffer_append(&buffer, "reader.active: %d\n",
-                             atomic_read(&dev_info->should_read));
        } else {
                buffer_append(&buffer, "reader is disabled\n");
        }
index 8bec5e9..0c04bc9 100644 (file)
  * device.
  */
 struct dev_info {
-       struct list_head list;             /**< list head for chaining into #dev_list                       */
-       struct net_device *dev;            /**< reference to networking interface                           */
-       struct net_device_stats stats;     /**< contains receive and transmit information                   */
-       char *bus_info;                    /**< text representation for input and output association        */
-       int mtu;                           /**< MTU for this interface                                      */
-       unsigned long flags;               /**< flags given on the command line                             */
-       spinlock_t writer_lock;            /**< lock for accessing the writer part                          */
-       struct muchannel *channel_out;     /**< output channel for write operations                         */
-       size_t writer_element_size;        /**< size of elements for write operations                       */
-       size_t writer_region_size;         /**< total size of writer buffer                                 */
-       u64 writer_protocol;               /**< protocol id for writer                                      */
-       u64 reader_protocol;               /**< protocol id for reader                                      */
-       unsigned int poll_interval;        /**< sleep period for the reader thread                          */
-       struct muchannel_reader reader;    /**< channel reader                                              */
-       size_t reader_element_size;        /**< size of elements for read operations                        */
-       struct muchannel *channel_in;      /**< input channel for read operations                           */
-       struct task_struct *reader_thread; /**< reference to reader thread                                  */
-       atomic_t should_read;              /**< tells the reader thread if reading is active                */
-       wait_queue_head_t read_wq;         /**< wait queue for signalling reader thread about state changes */
-       struct dentry *debugfs_dir;        /**< directory entry for debugfs directory                       */
-       struct dentry *debugfs_info;       /**< directory entry for information file                        */
+       struct list_head list;           /**< list head for chaining into #dev_list                */
+       struct net_device *dev;          /**< reference to networking interface                    */
+       struct net_device_stats stats;   /**< contains receive and transmit information            */
+       char *bus_info;                  /**< text representation for input and output association */
+       int mtu;                         /**< MTU for this interface                               */
+       unsigned long flags;             /**< flags given on the command line                      */
+       spinlock_t writer_lock;          /**< lock for accessing the writer part                   */
+       struct muchannel *channel_out;   /**< output channel for write operations                  */
+       size_t writer_element_size;      /**< size of elements for write operations                */
+       size_t writer_region_size;       /**< total size of writer buffer                          */
+       u64 writer_protocol;             /**< protocol id for writer                               */
+       u64 reader_protocol;             /**< protocol id for reader                               */
+       unsigned int poll_interval;      /**< sleep period for reader work                         */
+       struct muchannel_reader reader;  /**< channel reader                                       */
+       size_t reader_element_size;      /**< size of elements for read operations                 */
+       struct muchannel *channel_in;    /**< input channel for read operations                    */
+       struct delayed_work reader_work; /**< delayed reader work queue struct                     */
+       struct dentry *debugfs_dir;      /**< directory entry for debugfs directory                */
+       struct dentry *debugfs_info;     /**< directory entry for information file                 */
 };
 
 /**
diff --git a/net.c b/net.c
index d0b9b5f..2b2ee6d 100644 (file)
--- a/net.c
+++ b/net.c
@@ -36,7 +36,7 @@
  * with the whole implementation of network operations.
  *
  * The data is transferred via the #muennet_xmit function and received by the
- * #muennet_reader_thread function run as kernel thread.
+ * #muennet_reader_work work queue function.
  */
 /*@{*/
 
@@ -52,8 +52,8 @@ static LIST_HEAD(dev_list);
 /**
  * @brief Setup networking interface link.
  *
- * This function starts the network queue for the given interface and informs
- * the reader thread (if there is one) that data may be received.
+ * This function starts the network queue for the given interface and inserts
+ * the reader work into the events work queue.
  *
  * @param dev networking device to operate on
  * @return always 0 to indicate success
@@ -65,19 +65,15 @@ static int muennet_open(struct net_device *dev)
        writer_up(dev_info);
        netif_start_queue(dev);
 
-       if (dev_info->reader_thread) {
-               /* inform the thread of the changed state */
-               atomic_set(&dev_info->should_read, 1);
-               wake_up(&dev_info->read_wq);
-       }
+       schedule_delayed_work(&dev_info->reader_work, 0);
+
        return 0;
 }
 
 /**
  * @brief Teardown networking interface link.
  *
- * This function stops the network queue for this driver and informs the reader
- * thread (if there is one) that no more packets are to be retrieved.
+ * This function stops the network queue for this driver.
  *
  * @param dev the networking interface
  * @return always returns 0
@@ -88,8 +84,6 @@ static int muennet_close(struct net_device *dev)
 
        netif_stop_queue(dev);
        writer_down(dev_info);
-       if (dev_info->reader_thread)
-               atomic_set(&dev_info->should_read, 0);
 
        return 0;
 }
index 949f9c0..b970ff4 100644 (file)
--- a/reader.c
+++ b/reader.c
@@ -17,9 +17,9 @@
  * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#include <linux/kthread.h>
 #include <linux/ip.h>
 #include <linux/ipv6.h>
+#include <linux/workqueue.h>
 
 #include "internal.h"
 
  * @brief Implementation of reader part.
  *
  * This file implements the functions to setup and cleanup the reader part of
- * the network interface as well as the thread function responsible for polling
- * the channel for new data and injecting them into the network stack.
+ * the network interface as well as the work queue function responsible for
+ * polling the channel for new data and injecting them into the network stack.
  */
 
 /**
  * @brief Cleanup reader.
  *
- * This function frees all previously allocated memory after stopping the
- * reader thread.
+ * This function frees all previously allocated memory and removes delayed
+ * reader work from the work queue.
  *
  * @param dev_info private information stored in the network interface
  */
 void cleanup_reader(struct dev_info *dev_info)
 {
        dev_info->reader_element_size = 0;
-       if (dev_info->reader_thread) {
-               kthread_stop(dev_info->reader_thread);
-               dev_info->reader_thread = NULL;
-       }
+       cancel_delayed_work_sync(&dev_info->reader_work);
+
        iounmap(dev_info->channel_in);
        dev_info->channel_in = NULL;
 }
 
 /**
- * @brief Allocates (or reuses) an skb.
+ * @brief Allocates an skb.
  *
- * This little utility function retrieves a previously allocated skb (that was
- * never used) or allocates a new one if no previous skb is found.
+ * This little utility function allocates a new skb.
  *
- * @param skb      address of skb to reuse or allocate
+ * @param skb      address of skb to allocate
  * @param dev_info contains reader_element_size and flags from which the size
  *                 of the skb is derived
  *
@@ -67,198 +64,169 @@ void cleanup_reader(struct dev_info *dev_info)
 static int get_skb(struct sk_buff **skb, struct dev_info *dev_info)
 {
        size_t size = dev_info->reader_element_size;
+       struct sk_buff *new_skb;
 
        if (dev_info->flags & MUENNET_HDR)
                size += sizeof(struct net_hdr);
 
-       /* only do allocation if actually needed */
-       if (*skb == NULL) {
-               struct sk_buff *new_skb = alloc_skb(size, GFP_KERNEL);
-               if (new_skb == NULL)
-                       return -ENOMEM;
-               *skb = new_skb;
-       }
+       new_skb = alloc_skb(size, GFP_KERNEL);
+       if (new_skb == NULL)
+               return -ENOMEM;
+
+       *skb = new_skb;
        return 0;
 }
 
 /**
  * @ingroup reader
- * @brief Reader thread
+ * @brief Reader work queue function
  *
- * This function will query the memory region for new data. It first waits for
- * the should_read element in the #dev_info to be set. Then it starts reading
- * data. If an overrun is detected it will update the interface statistics.
+ * This work queue function will query the memory region for new data.
  *
  * As long as data is available, it will be read and injected into the
- * networking code. After each processed packet, control is returned to the
- * kernel to allow scheduling.
+ * networking code.
  *
- * If no data is available, the reader thread will pause (interruptible) as long
- * as requested by the #dev_info::poll_interval value.
+ * If no data is available, the reader work will be re-scheduled with a timeout
+ * as specified by the #dev_info::poll_interval value.
  *
- * Access to the reader part of #dev_info is protected with
- * #dev_info::reader_lock.
- *
- * @param data pointer to #dev_info for this networking interface
- * @return always returns 0 (on thread_stop)
+ * @param work work struct used to retrieve #dev_info for this interface
  */
-static int muennet_reader_thread(void *data)
+static void muennet_reader_work(struct work_struct *work)
 {
-       struct dev_info *dev_info = data;
+       struct dev_info *dev_info =
+               container_of(work, struct dev_info, reader_work.work);
        struct sk_buff *skb = NULL;
 
-       netdev_dbg(dev_info->dev, "Starting reader thread\n");
-
-       while (1) {
-               enum muchannel_reader_result result = MUCHANNEL_SUCCESS;
-
-               /* first wait until there is something to do */
-               wait_event_interruptible(dev_info->read_wq,
-                                        atomic_read(&dev_info->should_read) ||
-                                        kthread_should_stop());
-
-               /* check abort conditions */
-               if (kthread_should_stop())
-                       break;
-
-               while (result == MUCHANNEL_SUCCESS ||
-                               result == MUCHANNEL_OVERRUN_DETECTED) {
-                       uint32_t len;
-                       __be16 protocol = 0;
-                       struct iphdr *ipv4_hdr;
-                       struct ipv6hdr *ipv6_hdr;
-
-                       /*
-                        * fetch an skb, either a new one or the previous one if
-                        * still available. re-schedule if no memory is
-                        * available
-                        */
-                       if (get_skb(&skb, dev_info)) {
-                               netdev_warn(dev_info->dev,
-                                           "Failed to allocate skb\n");
-                               break;
-                       }
+       enum muchannel_reader_result result = MUCHANNEL_SUCCESS;
+
+       while (result == MUCHANNEL_SUCCESS ||
+                       result == MUCHANNEL_OVERRUN_DETECTED) {
+               uint32_t len;
+               __be16 protocol = 0;
+               struct iphdr *ipv4_hdr;
+               struct ipv6hdr *ipv6_hdr;
+
+               /*
+                * fetch an skb, either a new one or the previous one if
+                * still available. re-schedule if no memory is
+                * available
+                */
+               if (get_skb(&skb, dev_info)) {
+                       netdev_warn(dev_info->dev,
+                                   "Failed to allocate skb\n");
+                       goto schedule;
+               }
 
-                       result = muen_channel_read(dev_info->channel_in,
-                               &dev_info->reader,
-                               skb->data);
-
-                       if (result == MUCHANNEL_EPOCH_CHANGED) {
-                               /* TODO: Check protocol */
-                               dev_info->reader_element_size = dev_info->reader.size;
-
-                               if (dev_info->reader_element_size > 0x100000) {
-                                       netdev_err(dev_info->dev,
-                                                  "Element size to big %zu\n",
-                                                  dev_info->reader_element_size);
-                                       result = MUCHANNEL_INCOMPATIBLE_INTERFACE;
-                                       dev_info->reader_element_size = 0;
-                                       break;
-                               } else {
-                                       consume_skb(skb);
-                                       skb = NULL;
-
-                                       if (get_skb(&skb, dev_info)) {
-                                               netdev_warn(dev_info->dev,
-                                                           "Failed to allocate skb after consume\n");
-                                               break;
-                                       }
-                               }
-                       }
+               result = muen_channel_read(dev_info->channel_in,
+                                          &dev_info->reader,
+                                          skb->data);
 
-                       /* check if data is present */
-                       if (result == MUCHANNEL_NO_DATA ||
-                                       result == MUCHANNEL_INACTIVE)
-                               goto schedule;
+               if (result == MUCHANNEL_EPOCH_CHANGED) {
+                       /* TODO: Check protocol */
+                       dev_info->reader_element_size = dev_info->reader.size;
 
-                       /* check if overrun by writer */
-                       if (result == MUCHANNEL_OVERRUN_DETECTED) {
-                               netdev_info(dev_info->dev,
-                                           "Reader overrun detected\n");
-                               dev_info->stats.rx_errors++;
-                               dev_info->stats.rx_over_errors++;
+                       if (dev_info->reader_element_size > 0x100000) {
+                               netdev_err(dev_info->dev,
+                                          "Element size to big %zu\n",
+                                          dev_info->reader_element_size);
+                               result = MUCHANNEL_INCOMPATIBLE_INTERFACE;
+                               dev_info->reader_element_size = 0;
                                goto schedule;
-                       }
-
-                       /* check if net_hdr flag is given */
-                       if (dev_info->flags & MUENNET_HDR) {
-                               struct net_hdr *hdr = (struct net_hdr *)skb->data;
-
-                               skb->mark = hdr->mark;
-                               skb_reserve(skb, sizeof(struct net_hdr));
-                               switch (hdr->protocol) {
-                               case IPPROTO_IPIP:
-                                       protocol = htons(ETH_P_IP);
-                                       break;
-                               case IPPROTO_IPV6:
-                                       protocol = htons(ETH_P_IPV6);
-                                       break;
+                       } else {
+                               consume_skb(skb);
+                               skb = NULL;
+
+                               if (get_skb(&skb, dev_info)) {
+                                       netdev_warn(dev_info->dev,
+                                                   "Failed to allocate skb after consume\n");
+                                       goto schedule;
                                }
                        }
+               }
 
-                       /* read the data to determine the protocol */
-                       ipv4_hdr = (void *)skb->data;
-                       ipv6_hdr = (void *)skb->data;
+               /* check if data is present */
+               if (result == MUCHANNEL_NO_DATA ||
+                               result == MUCHANNEL_INACTIVE)
+                       goto schedule;
+
+               /* check if overrun by writer */
+               if (result == MUCHANNEL_OVERRUN_DETECTED) {
+                       netdev_info(dev_info->dev,
+                                   "Reader overrun detected\n");
+                       dev_info->stats.rx_errors++;
+                       dev_info->stats.rx_over_errors++;
+                       goto schedule;
+               }
 
-                       /* determine the payload length */
-                       if (protocol == htons(ETH_P_IP))
-                               len = be16_to_cpu(ipv4_hdr->tot_len);
-                       else if (protocol == htons(ETH_P_IPV6))
-                               len = be16_to_cpu(ipv6_hdr->payload_len) + 40;
-                       else
-                               len = dev_info->reader_element_size;
+               /* check if net_hdr flag is given */
+               if (dev_info->flags & MUENNET_HDR) {
+                       struct net_hdr *hdr = (struct net_hdr *)skb->data;
 
-                       if (len > dev_info->reader_element_size ||
-                               len > skb_tailroom(skb)) {
-                               netdev_warn(dev_info->dev,
-                                           "Invalid length: %u\n",
-                                           (unsigned int)len);
-                               dev_info->stats.rx_errors++;
-                               dev_info->stats.rx_frame_errors++;
-                               goto schedule;
+                       skb->mark = hdr->mark;
+                       skb_reserve(skb, sizeof(struct net_hdr));
+                       switch (hdr->protocol) {
+                       case IPPROTO_IPIP:
+                               protocol = htons(ETH_P_IP);
+                               break;
+                       case IPPROTO_IPV6:
+                               protocol = htons(ETH_P_IPV6);
+                               break;
                        }
+               }
+
+               /* read the data to determine the protocol */
+               ipv4_hdr = (void *)skb->data;
+               ipv6_hdr = (void *)skb->data;
 
-                       /*
-                        * now the skb is ready to be processed, but correct
-                        * some data first
-                        */
-                       skb->dev = dev_info->dev;
-                       skb_put(skb, len);
-                       skb->protocol = protocol;
-
-                       /* process and update stats */
-                       netif_rx_ni(skb);
-                       dev_info->stats.rx_packets++;
-                       dev_info->stats.rx_bytes += skb->len;
-
-                       /* now mark the skb as processed */
-                       skb = NULL;
-
-                       /* immediately try to read the next packet */
-                       continue;
-
-               schedule:
-                       /* allow other threads to run after each
-                        * attempt to read a packet */
-                       schedule();
+               /* determine the payload length */
+               if (protocol == htons(ETH_P_IP))
+                       len = be16_to_cpu(ipv4_hdr->tot_len);
+               else if (protocol == htons(ETH_P_IPV6))
+                       len = be16_to_cpu(ipv6_hdr->payload_len) + 40;
+               else
+                       len = dev_info->reader_element_size;
+
+               if (len > dev_info->reader_element_size ||
+                               len > skb_tailroom(skb)) {
+                       netdev_warn(dev_info->dev,
+                                   "Invalid length: %u\n",
+                                   (unsigned int)len);
+                       dev_info->stats.rx_errors++;
+                       dev_info->stats.rx_frame_errors++;
+                       goto schedule;
                }
 
-               /* schedule with some time to wait */
-               if (dev_info->poll_interval > 0)
-                       schedule_timeout_interruptible(usecs_to_jiffies
-                                       (dev_info->poll_interval * 1000));
+               /*
+                * now the skb is ready to be processed, but correct
+                * some data first
+                */
+               skb->dev = dev_info->dev;
+               skb_put(skb, len);
+               skb->protocol = protocol;
+
+               /* process and update stats */
+               netif_rx_ni(skb);
+               dev_info->stats.rx_packets++;
+               dev_info->stats.rx_bytes += skb->len;
+
+               /* now mark the skb as processed */
+               skb = NULL;
        }
 
-       netdev_dbg(dev_info->dev, "Stopping reader thread\n");
-       return 0;
+schedule:
+       if (skb != NULL)
+               dev_kfree_skb(skb);
+       schedule_delayed_work
+               (&dev_info->reader_work,
+                usecs_to_jiffies(dev_info->poll_interval * 1000));
 }
 
 /**
  * @brief Initialize reader.
  *
  * This function initializes the reader part of the networking interface. It
- * uses the region given as parameter and starts the thread executing the
- * #muennet_reader_thread function.
+ * uses the channel info struct to setup the shared memory channel and
+ * initializes the reader work function.
  *
  * The size of the elements is calculated based on the MTU configured for this
  * networking interface. The number of elements is limited by the maximum
@@ -270,21 +238,16 @@ static int muennet_reader_thread(void *data)
  * @return 0 on success
  * @return -EPERM  if channel is writable
  * @return -EFAULT if ioremap fails
- * @return -ENOMEM if memory allocation failed
- * @return errors returned by kthread_create()
  */
 int initialize_reader(struct dev_info *dev_info,
                      const struct muen_channel_info * const channel)
 {
-       int ret;
-
        if (channel->writable) {
                netdev_err(dev_info->dev, "Reader channel '%s' writable\n",
                           channel->name);
                return -EPERM;
        }
 
-       ret = -ENOMEM;
        dev_info->reader_element_size = 0;
 
        dev_info->channel_in = ioremap_cache(channel->address, channel->size);
@@ -295,20 +258,7 @@ int initialize_reader(struct dev_info *dev_info,
 
        muen_channel_init_reader(&dev_info->reader, dev_info->reader_protocol);
 
-       dev_info->reader_thread = kthread_create
-               (muennet_reader_thread, dev_info, "%s/reader",
-                dev_info->dev->name);
-       if (IS_ERR(dev_info->reader_thread)) {
-               ret = PTR_ERR(dev_info->reader_thread);
-               dev_info->reader_thread = NULL;
-               iounmap(dev_info->channel_in);
-               dev_info->channel_in = NULL;
-               return ret;
-       }
-
-       init_waitqueue_head(&dev_info->read_wq);
-       atomic_set(&dev_info->should_read, 0);
-       wake_up_process(dev_info->reader_thread);
+       INIT_DELAYED_WORK(&dev_info->reader_work, muennet_reader_work);
 
        return 0;
 }